Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ingest): remove inferred args to MCPW, part 1 #6819

Merged
merged 3 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions metadata-ingestion/as-a-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pip install -U `acryl-datahub[datahub-rest]`
```python
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass
from datahub.metadata.schema_classes import DatasetPropertiesClass

from datahub.emitter.rest_emitter import DatahubRestEmitter

Expand All @@ -39,10 +39,7 @@ dataset_properties = DatasetPropertiesClass(description="This table stored the c

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"),
aspectName="datasetProperties",
aspect=dataset_properties,
)

Expand Down Expand Up @@ -75,7 +72,7 @@ pip install -U `acryl-datahub[datahub-kafka]`
```python
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass
from datahub.metadata.schema_classes import DatasetPropertiesClass

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
# Create an emitter to Kafka
Expand All @@ -100,10 +97,7 @@ dataset_properties = DatasetPropertiesClass(description="This table stored the c

# Construct a MetadataChangeProposalWrapper object.
metadata_event = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"),
aspectName="datasetProperties",
aspect=dataset_properties,
)

Expand Down
19 changes: 0 additions & 19 deletions metadata-ingestion/examples/library/dashboard_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ChangeTypeClass,
DashboardUsageStatisticsClass,
DashboardUserUsageCountsClass,
TimeWindowSizeClass,
Expand All @@ -28,10 +27,7 @@
]

usage_day_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000
Expand All @@ -45,10 +41,7 @@

absolute_usage_as_of_day_1: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000
Expand Down Expand Up @@ -77,10 +70,7 @@
),
]
usage_day_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000
Expand All @@ -94,10 +84,7 @@

absolute_usage_as_of_day_2: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000
Expand All @@ -123,10 +110,7 @@
),
]
usage_day_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000
Expand All @@ -140,10 +124,7 @@

absolute_usage_as_of_day_3: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dashboard_urn("looker", "dashboards.999999"),
aspectName="dashboardUsageStatistics",
aspect=DashboardUsageStatisticsClass(
timestampMillis=round(
datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000
Expand Down
13 changes: 0 additions & 13 deletions metadata-ingestion/examples/library/data_quality_mcpw_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType
from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec


Expand All @@ -39,10 +38,7 @@ def assertionUrn(info: AssertionInfo) -> str:

def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionResult.assertionUrn,
aspectName="assertionRunEvent",
aspect=assertionResult,
)

Expand All @@ -58,10 +54,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
)
# Construct a MetadataChangeProposalWrapper object for dataset
dataset_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeType.UPSERT,
entityUrn=datasetUrn("bazTable"),
aspectName="datasetProperties",
aspect=datasetProperties,
)

Expand Down Expand Up @@ -93,10 +86,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:

# Construct a MetadataChangeProposalWrapper object.
assertion_maxVal_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionUrn(assertion_maxVal),
aspectName="assertionInfo",
aspect=assertion_maxVal,
)

Expand All @@ -110,10 +100,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:

# Construct a MetadataChangeProposalWrapper object for assertion platform
assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper(
entityType="assertion",
changeType=ChangeType.UPSERT,
entityUrn=assertionUrn(assertion_maxVal),
aspectName="dataPlatformInstance",
aspect=assertion_dataPlatformInstance,
)
# Emit Assertion entity platform aspect!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
EditableSchemaFieldInfoClass,
EditableSchemaMetadataClass,
GlossaryTermAssociationClass,
Expand Down Expand Up @@ -94,10 +93,7 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str:

if need_write:
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="editableSchemaMetadata",
aspect=current_editable_schema_metadata,
)
graph.emit(event)
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/examples/library/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
DateTypeClass,
OtherSchemaClass,
SchemaFieldClass,
Expand All @@ -16,10 +15,7 @@
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD"),
aspectName="schemaMetadata",
aspect=SchemaMetadataClass(
schemaName="customer", # not used
platform=make_data_platform_urn("hive"), # important <- platform must be an urn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
Expand All @@ -25,10 +24,7 @@
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=make_dataset_urn(platform="hive", name="foodb.barTable", env="PROD"),
aspectName="schemaMetadata",
aspect=SchemaMetadataClass(
schemaName="customer", # not used
platform=make_data_platform_urn("hive"), # important <- platform must be an urn
Expand Down
9 changes: 1 addition & 8 deletions metadata-ingestion/examples/library/dataset_set_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,15 @@
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
ChangeTypeClass,
GlobalTagsClass,
TagAssociationClass,
)
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")
tag_urn = make_tag_urn("purchase")
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="globalTags",
aspect=GlobalTagsClass(tags=[TagAssociationClass(tag=tag_urn)]),
)

Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/examples/library/dataset_set_term.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
)
Expand All @@ -34,10 +33,7 @@
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="glossaryTerms",
aspect=terms_aspect,
)
rest_emitter.emit(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dashboard import DashboardInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass

# Construct the DashboardInfo aspect with the charts -> dashboard lineage.
charts_in_dashboard: List[str] = [
Expand All @@ -25,10 +25,7 @@
# Construct a MetadataChangeProposalWrapper object with the DashboardInfo aspect.
# NOTE: This will overwrite all of the existing dashboard aspect information associated with this dashboard.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="dashboard",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dashboard_urn(platform="looker", name="my_dashboard_1"),
aspectName="dashboardInfo",
aspect=dashboard_info,
)

Expand Down
5 changes: 1 addition & 4 deletions metadata-ingestion/examples/library/lineage_dataset_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.chart import ChartInfoClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass
from datahub.metadata.schema_classes import ChangeAuditStampsClass

# Construct the ChartInfo aspect with the input_datasets lineage.
input_datasets: List[str] = [
Expand All @@ -24,10 +24,7 @@
# Construct a MetadataChangeProposalWrapper object with the ChartInfo aspect.
# NOTE: This will overwrite all of the existing chartInfo aspect information associated with this chart.
chart_info_mcp = MetadataChangeProposalWrapper(
entityType="chart",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_chart_urn(platform="looker", name="my_chart_1"),
aspectName="chartInfo",
aspect=chart_info,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInputOutputClass
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct the DataJobInputOutput aspect.
input_datasets: List[str] = [
Expand Down Expand Up @@ -33,12 +32,9 @@
# Construct a MetadataChangeProposalWrapper object.
# NOTE: This will overwrite all of the existing lineage information associated with this job.
datajob_input_output_mcp = MetadataChangeProposalWrapper(
entityType="dataJob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn(
orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD"
),
aspectName="dataJobInputOutput",
aspect=datajob_input_output,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)
from datahub.metadata.schema_classes import ChangeTypeClass, DataJobInputOutputClass
from datahub.metadata.schema_classes import DataJobInputOutputClass


def datasetUrn(tbl):
Expand Down Expand Up @@ -100,10 +100,7 @@ def fldUrn(tbl, fld):
)

dataJobLineageMcp = MetadataChangeProposalWrapper(
entityType="dataJob",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"),
aspectName="dataJobInputOutput",
aspect=dataJobInputOutput,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
Upstream,
UpstreamLineage,
)
from datahub.metadata.schema_classes import ChangeTypeClass


def datasetUrn(tbl):
Expand Down Expand Up @@ -75,10 +74,7 @@ def fldUrn(tbl, fld):
)

lineageMcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=datasetUrn("bar"),
aspectName="upstreamLineage",
aspect=fieldLineages,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
UpstreamClass,
UpstreamLineage,
)
from datahub.metadata.schema_classes import ChangeTypeClass

upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
Expand All @@ -26,10 +25,7 @@

# Construct a MetadataChangeProposalWrapper object.
lineage_mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn("bigquery", "downstream"),
aspectName="upstreamLineage",
aspect=upstream_lineage,
)

Expand Down
Loading