Skip to content

Commit

Permalink
refactor(ingest): prefer as imports instead of pegasus2avro files (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and shirshanka committed Sep 8, 2022
1 parent 08a4d34 commit 85c072d
Show file tree
Hide file tree
Showing 11 changed files with 16 additions and 23 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ include = '\.pyi?$'
target-version = ['py36', 'py37', 'py38']

[tool.isort]
combine_as_imports = true
indent = ' '
profile = 'black'
sections = 'FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER'
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/scripts/modeldocgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import (
BrowsePathsClass,
ChangeTypeClass,
Expand All @@ -30,6 +29,7 @@
GlobalTagsClass,
MetadataChangeEventClass,
OtherSchemaClass,
SchemaFieldClass as SchemaField,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/emitter/kafka_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
from datahub.metadata.schema_classes import (
MetadataChangeEventClass as MetadataChangeEvent,
MetadataChangeProposalClass as MetadataChangeProposal,
)
from datahub.metadata.schemas import (
getMetadataChangeEventSchema,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.common import GlossaryTerms
from datahub.metadata.schema_classes import (
AuditStampClass,
ContainerKeyClass,
Expand All @@ -21,6 +20,7 @@
DatasetSnapshotClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
GlossaryTermsClass as GlossaryTerms,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
Expand All @@ -31,8 +31,8 @@
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.schema_classes import _Aspect as AspectAbstract
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger = logging.getLogger(__name__)
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
import typing
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from dataclasses import dataclass, field as dataclass_field
from typing import (
Any,
Dict,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import datetime
import logging
import re
from dataclasses import dataclass
from dataclasses import field as dataclasses_field
from dataclasses import dataclass, field as dataclasses_field
from enum import Enum
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple, Union
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import pathlib
import re
import sys
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from dataclasses import replace
from dataclasses import dataclass, field as dataclass_field, replace
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type

import pydantic
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
#########################################################

import logging
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from dataclasses import dataclass, field as dataclass_field
from enum import Enum
from time import sleep
from typing import Any, Dict, Iterable, List, Optional, Tuple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
from pyspark.sql.types import DataType as SparkDataType
from pyspark.sql.types import (
DataType as SparkDataType,
DateType,
DecimalType,
DoubleType,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from collections import Counter
from typing import Any
from typing import Counter as CounterType
from typing import Dict, Sequence, Tuple, Union
from typing import Any, Counter as CounterType, Dict, Sequence, Tuple, Union

from mypy_extensions import TypedDict

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Type, Union, cast
from typing import Any, Dict, Iterable, List, Optional, Type, Union

import datahub.emitter.mce_builder
from datahub.emitter.mce_builder import Aspect
Expand Down Expand Up @@ -30,7 +30,6 @@
StatusClass,
UpstreamLineageClass,
ViewPropertiesClass,
_Aspect,
)
from datahub.utilities.urns.urn import Urn

Expand Down Expand Up @@ -213,12 +212,12 @@ def _transform_or_record_mcp(
# remember stuff
assert envelope.record.entityUrn
assert isinstance(self, SingleAspectTransformer)
if envelope.record.aspectName == self.aspect_name():
if envelope.record.aspectName == self.aspect_name() and envelope.record.aspect:
# we have a match on the aspect name, call the specific transform function
transformed_aspect = self.transform_aspect(
entity_urn=envelope.record.entityUrn,
aspect_name=envelope.record.aspectName,
aspect=cast(_Aspect, envelope.record.aspect),
aspect=envelope.record.aspect,
)
self._mark_processed(envelope.record.entityUrn)
if transformed_aspect is None:
Expand Down

0 comments on commit 85c072d

Please sign in to comment.