Skip to content

Commit

Permalink
feat(ingest): extract kafka topic config properties as customProperti…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored and szalai1 committed Dec 22, 2022
1 parent 7b274e7 commit 4e0a1b7
Show file tree
Hide file tree
Showing 3 changed files with 686 additions and 523 deletions.
182 changes: 178 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
import concurrent.futures
import json
import logging
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Type
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Type

import confluent_kafka
import confluent_kafka.admin
import pydantic
from confluent_kafka.admin import (
AdminClient,
ConfigEntry,
ConfigResource,
ResourceType,
TopicMetadata,
)

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
Expand Down Expand Up @@ -44,17 +55,28 @@
BrowsePathsClass,
ChangeTypeClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
SubTypesClass,
)
from datahub.utilities.registries.domain_registry import DomainRegistry

logger = logging.getLogger(__name__)


class KafkaTopicConfigKeys(str, Enum):
MIN_INSYNC_REPLICAS_CONFIG = "min.insync.replicas"
RETENTION_SIZE_CONFIG = "retention.bytes"
RETENTION_TIME_CONFIG = "retention.ms"
CLEANUP_POLICY_CONFIG = "cleanup.policy"
MAX_MESSAGE_SIZE_CONFIG = "max.message.bytes"
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# TODO: inline the connection config
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = pydantic.Field(
default={},
Expand Down Expand Up @@ -132,6 +154,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
**self.source_config.connection.consumer_config,
}
)
self.init_kafka_admin_client()
self.report: KafkaSourceReport = KafkaSourceReport()
self.schema_registry_client: KafkaSchemaRegistryBase = (
KafkaSource.create_schema_registry(config, self.report)
Expand All @@ -150,6 +173,23 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
run_id=self.ctx.run_id,
)

def init_kafka_admin_client(self) -> None:
try:
# TODO: Do we require separate config than existing consumer_config ?
self.admin_client = AdminClient(
{
"group.id": "test",
"bootstrap.servers": self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
}
)
except Exception as e:
logger.debug(e, exc_info=e)
self.report.report_warning(
"kafka-admin-client",
f"Failed to create Kafka Admin Client due to error {e}.",
)

def get_platform_instance_id(self) -> str:
assert self.source_config.platform_instance is not None
return self.source_config.platform_instance
Expand All @@ -161,10 +201,12 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
topics = self.consumer.list_topics().topics
for t in topics:
extra_topic_details = self.fetch_extra_topic_details(topics.keys())

for t, t_detail in topics.items():
self.report.report_topic_scanned(t)
if self.source_config.topic_patterns.allowed(t):
yield from self._extract_record(t)
yield from self._extract_record(t, t_detail, extra_topic_details.get(t))
# add topic to checkpoint
topic_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
Expand All @@ -180,7 +222,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]:
def _extract_record(
self,
topic: str,
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Iterable[MetadataWorkUnit]:
logger.debug(f"topic = {topic}")

# 1. Create the default dataset snapshot for the topic.
Expand Down Expand Up @@ -215,6 +262,16 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]:
)
dataset_snapshot.aspects.append(browse_path)

custom_props = self.build_custom_properties(
topic, topic_detail, extra_topic_config
)

dataset_properties = DatasetPropertiesClass(
name=topic,
customProperties=custom_props,
)
dataset_snapshot.aspects.append(dataset_properties)

# 4. Attach dataPlatformInstance aspect.
if self.source_config.platform_instance:
dataset_snapshot.aspects.append(
Expand Down Expand Up @@ -265,10 +322,127 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]:
self.report.report_workunit(wu)
yield wu

def build_custom_properties(
self,
topic: str,
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Dict[str, str]:

custom_props: Dict[str, str] = {}
self.update_custom_props_with_topic_details(topic, topic_detail, custom_props)
self.update_custom_props_with_topic_config(
topic, extra_topic_config, custom_props
)
return custom_props

def update_custom_props_with_topic_details(
self,
topic: str,
topic_detail: Optional[TopicMetadata],
custom_props: Dict[str, str],
) -> None:
if topic_detail is None or topic_detail.partitions is None:
logger.info(
f"Partitions and Replication Factor not available for topic {topic}"
)
return

custom_props["Partitions"] = str(len(topic_detail.partitions))
replication_factor: Optional[int] = None
for _, p_meta in topic_detail.partitions.items():
if replication_factor is None or len(p_meta.replicas) > replication_factor:
replication_factor = len(p_meta.replicas)

if replication_factor is not None:
custom_props["Replication Factor"] = str(replication_factor)

def update_custom_props_with_topic_config(
self,
topic: str,
topic_config: Optional[Dict[str, ConfigEntry]],
custom_props: Dict[str, str],
) -> None:
if topic_config is None:
return

for config_key in KafkaTopicConfigKeys:
try:
if (
config_key in topic_config.keys()
and topic_config[config_key] is not None
):
config_value = topic_config[config_key].value
custom_props[config_key] = (
config_value
if isinstance(config_value, str)
else json.dumps(config_value)
)
except Exception as e:
logger.info(f"{config_key} is not available for topic due to error {e}")

def get_report(self) -> KafkaSourceReport:
return self.report

def close(self) -> None:
if self.consumer:
self.consumer.close()
super().close()

def _get_config_value_if_present(
self, config_dict: Dict[str, ConfigEntry], key: str
) -> Any:
return

def fetch_extra_topic_details(self, topics: List[str]) -> Dict[str, dict]:
extra_topic_details = {}

if not hasattr(self, "admin_client"):
logger.debug(
"Kafka Admin Client missing. Not fetching config details for topics."
)
else:
try:
extra_topic_details = self.fetch_topic_configurations(topics)
except Exception as e:
logger.debug(e, exc_info=e)
logger.warning(f"Failed to fetch config details due to error {e}.")
return extra_topic_details

def fetch_topic_configurations(self, topics: List[str]) -> Dict[str, dict]:
logger.info("Fetching config details for all topics")

configs: Dict[
ConfigResource, concurrent.futures.Future
] = self.admin_client.describe_configs(
resources=[ConfigResource(ResourceType.TOPIC, t) for t in topics]
)
logger.debug("Waiting for config details futures to complete")
concurrent.futures.wait(configs.values())
logger.debug("Config details futures completed")

topic_configurations: Dict[str, dict] = {}
for config_resource, config_result_future in configs.items():
self.process_topic_config_result(
config_resource, config_result_future, topic_configurations
)
return topic_configurations

def process_topic_config_result(
self,
config_resource: ConfigResource,
config_result_future: concurrent.futures.Future,
topic_configurations: dict,
) -> None:
try:
assert config_result_future.done()
assert config_result_future.exception() is None
topic_configurations[config_resource.name] = config_result_future.result()
except Exception as e:
logger.warning(
f"Config details for topic {config_resource.name} not fetched due to error {e}"
)
else:
logger.info(
f"Config details for topic {config_resource.name} fetched successfully"
)
Loading

0 comments on commit 4e0a1b7

Please sign in to comment.