diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py index 0b201278142e3a..23a99ccb310e13 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka_connect.py @@ -282,10 +282,6 @@ class JdbcParser: query: str transforms: list - def report_warning(self, key: str, reason: str) -> None: - logger.warning(f"{key}: {reason}") - self.report.report_warning(key, reason) - def get_parser( self, connector_manifest: ConnectorManifest, @@ -355,9 +351,9 @@ def default_get_lineages( source_table = f"{table_name_tuple[-2]}.{source_table}" else: include_source_dataset = False - self.report_warning( - self.connector_manifest.name, - f"could not find schema for table {source_table}", + self.report.warning( + "Could not find schema for table" + f"{self.connector_manifest.name} : {source_table}", ) dataset_name: str = get_dataset_name(database_name, source_table) lineage = KafkaConnectLineage( @@ -457,9 +453,9 @@ def _extract_lineages(self): target_platform=KAFKA, ) lineages.append(lineage) - self.report_warning( + self.report.warning( + "Could not find input dataset, the connector has query configuration set", self.connector_manifest.name, - "could not find input dataset, the connector has query configuration set", ) self.connector_manifest.lineages = lineages return @@ -535,24 +531,24 @@ def _extract_lineages(self): include_source_dataset=False, ) ) - self.report_warning( - self.connector_manifest.name, - f"could not find input dataset, for connector topics {topic_names}", + self.report.warning( + "Could not find input dataset for connector topics", + f"{self.connector_manifest.name} : {topic_names}", ) self.connector_manifest.lineages = lineages return else: include_source_dataset = True if SINGLE_TRANSFORM and UNKNOWN_TRANSFORM: - self.report_warning( - self.connector_manifest.name, - f"could not find input dataset, connector has unknown transform - {transforms[0]['type']}", + self.report.warning( + "Could not find input dataset, connector has unknown transform", + f"{self.connector_manifest.name} : {transforms[0]['type']}", ) include_source_dataset = False if not SINGLE_TRANSFORM and UNKNOWN_TRANSFORM: - self.report_warning( + self.report.warning( + "Could not find input dataset, connector has one or more unknown transforms", self.connector_manifest.name, - "could not find input dataset, connector has one or more unknown transforms", ) include_source_dataset = False lineages = self.default_get_lineages( @@ -753,8 +749,10 @@ def _extract_lineages(self): lineages.append(lineage) self.connector_manifest.lineages = lineages except Exception as e: - self.report.report_warning( - self.connector_manifest.name, f"Error resolving lineage: {e}" + self.report.warning( + "Error resolving lineage for connector", + self.connector_manifest.name, + exc=e, ) return @@ -783,10 +781,6 @@ class BQParser: defaultDataset: Optional[str] = None version: str = "v1" - def report_warning(self, key: str, reason: str) -> None: - logger.warning(f"{key}: {reason}") - self.report.report_warning(key, reason) - def get_parser( self, connector_manifest: ConnectorManifest, @@ -917,9 +911,9 @@ def _extract_lineages(self): transformed_topic = self.apply_transformations(topic, transforms) dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser) if dataset_table is None: - self.report_warning( - self.connector_manifest.name, - f"could not find target dataset for topic {transformed_topic}, please check your connector configuration", + self.report.warning( + "Could not find target dataset for topic, please check your connector configuration" + f"{self.connector_manifest.name} : {transformed_topic} ", ) continue target_dataset = f"{project}.{dataset_table}" @@ -954,10 +948,6 @@ class SnowflakeParser: schema_name: str topics_to_tables: Dict[str, str] - def report_warning(self, key: str, reason: str) -> None: - logger.warning(f"{key}: {reason}") - self.report.report_warning(key, reason) - def get_table_name_from_topic_name(self, topic_name: str) -> str: """ This function converts the topic name to a valid Snowflake table name using some rules. @@ -1105,8 +1095,10 @@ def _extract_lineages(self): ) self.connector_manifest.lineages = lineages except Exception as e: - self.report.report_warning( - self.connector_manifest.name, f"Error resolving lineage: {e}" + self.report.warning( + "Error resolving lineage for connector", + self.connector_manifest.name, + exc=e, ) return @@ -1155,7 +1147,7 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext): ) self.session.auth = (self.config.username, self.config.password) - test_response = self.session.get(f"{self.config.connect_uri}") + test_response = self.session.get(f"{self.config.connect_uri}/connectors") test_response.raise_for_status() logger.info(f"Connection to {self.config.connect_uri} is ok") if not jpype.isJVMStarted(): @@ -1178,13 +1170,16 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: payload = connector_response.json() - for c in payload: - connector_url = f"{self.config.connect_uri}/connectors/{c}" - connector_response = self.session.get(connector_url) - manifest = connector_response.json() - connector_manifest = ConnectorManifest(**manifest) - if not self.config.connector_patterns.allowed(connector_manifest.name): - self.report.report_dropped(connector_manifest.name) + for connector_name in payload: + connector_url = f"{self.config.connect_uri}/connectors/{connector_name}" + connector_manifest = self._get_connector_manifest( + connector_name, connector_url + ) + if ( + connector_manifest is None + or not self.config.connector_patterns.allowed(connector_manifest.name) + ): + self.report.report_dropped(connector_name) continue if self.config.provided_configs: @@ -1195,19 +1190,11 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: connector_manifest.lineages = list() connector_manifest.url = connector_url - topics = self.session.get( - f"{self.config.connect_uri}/connectors/{c}/topics", - ).json() - - connector_manifest.topic_names = topics[c]["topics"] + connector_manifest.topic_names = self._get_connector_topics(connector_name) # Populate Source Connector metadata if connector_manifest.type == SOURCE: - tasks = self.session.get( - f"{self.config.connect_uri}/connectors/{c}/tasks", - ).json() - - connector_manifest.tasks = tasks + connector_manifest.tasks = self._get_connector_tasks(connector_name) # JDBC source connector lineages if connector_manifest.config.get(CONNECTOR_CLASS).__eq__( @@ -1246,7 +1233,7 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: ) continue - for topic in topics: + for topic in connector_manifest.topic_names: lineage = KafkaConnectLineage( source_dataset=target_connector.source_dataset, source_platform=target_connector.source_platform, @@ -1286,6 +1273,49 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]: return connectors_manifest + def _get_connector_manifest( + self, connector_name: str, connector_url: str + ) -> Optional[ConnectorManifest]: + try: + connector_response = self.session.get(connector_url) + connector_response.raise_for_status() + except Exception as e: + self.report.warning( + "Failed to get connector details", connector_name, exc=e + ) + return None + manifest = connector_response.json() + connector_manifest = ConnectorManifest(**manifest) + return connector_manifest + + def _get_connector_tasks(self, connector_name: str) -> dict: + try: + response = self.session.get( + f"{self.config.connect_uri}/connectors/{connector_name}/tasks", + ) + response.raise_for_status() + except Exception as e: + self.report.warning( + "Error getting connector tasks", context=connector_name, exc=e + ) + return {} + + return response.json() + + def _get_connector_topics(self, connector_name: str) -> List[str]: + try: + response = self.session.get( + f"{self.config.connect_uri}/connectors/{connector_name}/topics", + ) + response.raise_for_status() + except Exception as e: + self.report.warning( + "Error getting connector topics", context=connector_name, exc=e + ) + return [] + + return response.json()[connector_name]["topics"] + def construct_flow_workunit(self, connector: ConnectorManifest) -> MetadataWorkUnit: connector_name = connector.name connector_type = connector.type