From 6bc85502bae5425647953753ef0f931c3c349d8a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 4 Jan 2023 14:30:33 -0500 Subject: [PATCH] feat(ingest): add `include_table_location_lineage` flag for SQL common (#6934) --- .../src/datahub/ingestion/source/sql/athena.py | 2 ++ .../src/datahub/ingestion/source/sql/mssql.py | 2 +- .../src/datahub/ingestion/source/sql/sql_common.py | 7 ++++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index b8f341cf27733c..fc6c821bf90dbd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -88,11 +88,13 @@ def get_sql_alchemy_url(self): SourceCapability.DATA_PROFILING, "Optionally enabled via configuration. Profiling uses sql queries on whole table which can be expensive operation.", ) +@capability(SourceCapability.LINEAGE_COARSE, "Supported for S3 tables") @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") class AthenaSource(SQLAlchemySource): """ This plugin supports extracting the following metadata from Athena - Tables, schemas etc. + - Lineage for S3 tables. - Profiling when enabled. """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py index 0f0e11310e65cb..42a2511563dc46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py @@ -203,7 +203,7 @@ def get_table_properties( ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: description, properties, location_urn = super().get_table_properties( inspector, schema, table - ) # type:Tuple[Optional[str], Dict[str, str], Optional[str]] + ) # Update description if available. db_name: str = self.get_db_name(inspector) description = self.table_descriptions.get( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 6db79f2fb6f69a..f9454fcf78845d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -258,6 +258,11 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase): default=True, description="Whether tables should be ingested." ) + include_table_location_lineage: bool = Field( + default=True, + description="If the source supports it, include table lineage to the underlying storage location.", + ) + profiling: GEProfilingConfig = GEProfilingConfig() # Custom Stateful Ingestion settings stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None @@ -841,7 +846,7 @@ def _process_table( ) dataset_snapshot.aspects.append(dataset_properties) - if location_urn: + if self.config.include_table_location_lineage and location_urn: external_upstream_table = UpstreamClass( dataset=location_urn, type=DatasetLineageTypeClass.COPY,