Skip to content

Commit

Permalink
feat(ingest): add include_table_location_lineage flag for SQL common (
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 4, 2023
1 parent 7e73ae7 commit 6bc8550
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ def get_sql_alchemy_url(self):
SourceCapability.DATA_PROFILING,
"Optionally enabled via configuration. Profiling uses sql queries on whole table which can be expensive operation.",
)
@capability(SourceCapability.LINEAGE_COARSE, "Supported for S3 tables")
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
class AthenaSource(SQLAlchemySource):
"""
This plugin supports extracting the following metadata from Athena
- Tables, schemas etc.
- Lineage for S3 tables.
- Profiling when enabled.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_table_properties(
) -> Tuple[Optional[str], Dict[str, str], Optional[str]]:
description, properties, location_urn = super().get_table_properties(
inspector, schema, table
) # type:Tuple[Optional[str], Dict[str, str], Optional[str]]
)
# Update description if available.
db_name: str = self.get_db_name(inspector)
description = self.table_descriptions.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase):
default=True, description="Whether tables should be ingested."
)

include_table_location_lineage: bool = Field(
default=True,
description="If the source supports it, include table lineage to the underlying storage location.",
)

profiling: GEProfilingConfig = GEProfilingConfig()
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
Expand Down Expand Up @@ -841,7 +846,7 @@ def _process_table(
)
dataset_snapshot.aspects.append(dataset_properties)

if location_urn:
if self.config.include_table_location_lineage and location_urn:
external_upstream_table = UpstreamClass(
dataset=location_urn,
type=DatasetLineageTypeClass.COPY,
Expand Down

0 comments on commit 6bc8550

Please sign in to comment.