Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Kafka] Convert client methods to async #1349

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integrations/kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.117 (2025-01-24)


### Improvements

- Converted client methods to async to prevent blocking operations


## 0.1.116 (2025-01-23)


Expand Down
23 changes: 14 additions & 9 deletions integrations/kafka/kafka_integration/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,27 @@ def __init__(self, cluster_name: str, conf: dict[str, Any]):
self.kafka_admin_client = AdminClient(conf)
self.cluster_metadata = self.kafka_admin_client.list_topics()

def describe_cluster(self) -> dict[str, Any]:
async def describe_cluster(self) -> dict[str, Any]:
return {
"name": self.cluster_name,
"controller_id": self.cluster_metadata.controller_id,
}

def describe_brokers(self) -> list[dict[str, Any]]:
async def describe_brokers(self) -> list[dict[str, Any]]:
result_brokers = []
for broker in self.cluster_metadata.brokers.values():
brokers_configs = self.kafka_admin_client.describe_configs(
[ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, str(broker.id))]
brokers_configs = await to_thread.run_sync(
self.kafka_admin_client.describe_configs,
[ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, str(broker.id))],
)
for broker_config_resource, future in brokers_configs.items():
broker_id = broker_config_resource.name
try:
broker_config = {
key: value.value for key, value in future.result().items()
key: value.value
for key, value in (
await to_thread.run_sync(future.result)
).items()
}
result_brokers.append(
{
Expand All @@ -44,7 +48,7 @@ def describe_brokers(self) -> list[dict[str, Any]]:
raise e
return result_brokers

def describe_topics(self) -> list[dict[str, Any]]:
async def describe_topics(self) -> list[dict[str, Any]]:
result_topics = []
topics_config_resources = []
topics_metadata_dict = {}
Expand All @@ -55,14 +59,15 @@ def describe_topics(self) -> list[dict[str, Any]]:
)
topics_metadata_dict[topic.topic] = topic

topics_configs = self.kafka_admin_client.describe_configs(
topics_config_resources
topics_configs = await to_thread.run_sync(
self.kafka_admin_client.describe_configs, topics_config_resources
)
for topic_config_resource, future in topics_configs.items():
topic_name = topic_config_resource.name
try:
topic_config = {
key: value.value for key, value in future.result().items()
key: value.value
for key, value in (await to_thread.run_sync(future.result)).items()
}
partitions = [
{
Expand Down
6 changes: 3 additions & 3 deletions integrations/kafka/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ def init_clients() -> list[KafkaClient]:
async def resync_cluster(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield [kafka_client.describe_cluster()]
yield [await kafka_client.describe_cluster()]


@ocean.on_resync("broker")
async def resync_brokers(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield kafka_client.describe_brokers()
yield await kafka_client.describe_brokers()


@ocean.on_resync("topic")
async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield kafka_client.describe_topics()
yield await kafka_client.describe_topics()


@ocean.on_resync("consumer_group")
Expand Down
2 changes: 1 addition & 1 deletion integrations/kafka/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kafka"
version = "0.1.116"
version = "0.1.117"
description = "Integration to import information from a Kafka cluster into Port. The integration supports importing metadata regarding the Kafka cluster, brokers and topics."
authors = ["Tal Sabag <[email protected]>"]

Expand Down
Loading