Skip to content

Commit

Permalink
[Integration][Kafka] Add consumer groups (#1333)
Browse files Browse the repository at this point in the history
  • Loading branch information
phalbert authored Jan 22, 2025
1 parent ff20ac6 commit f54b514
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 51 deletions.
10 changes: 9 additions & 1 deletion integrations/_infra/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ define deactivate_virtualenv
fi
endef

.SILENT: install install/prod install/local-core lint lint/fix run test clean
.SILENT: install install/prod install/local-core lint lint/fix run test clean seed

install:
$(call deactivate_virtualenv) && \
Expand Down Expand Up @@ -85,3 +85,11 @@ clean:
rm -rf .tox/
rm -rf docs/_build
rm -rf dist/

seed:
@if [ -f "tests/seed_data.py" ]; then \
$(ACTIVATE) && python tests/seed_data.py; \
else \
echo "No seeding script found. Create tests/seed_data.py for this integration if needed."; \
exit 0; \
fi
153 changes: 104 additions & 49 deletions integrations/kafka/.port/resources/blueprints.json
Original file line number Diff line number Diff line change
@@ -1,48 +1,48 @@
[
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
"identifier":"kafkaCluster",
"title":"Cluster",
"icon":"Kafka",
"schema":{
"properties":{
"controllerId":{
"title":"Controller ID",
"type":"string"
}
}
}
}
}
},
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
"identifier":"kafkaBroker",
"title":"Broker",
"icon":"Kafka",
"schema":{
"properties":{
"address":{
"title":"Address",
"type":"string"
},
"region":{
"title":"Region",
"type":"string"
},
"version":{
"title":"Version",
"type":"string"
},
"config":{
"title":"Config",
"type":"object"
}
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
},
"relations":{
"cluster":{
"target":"kafkaCluster",
"required":true,
"many":false
}
}
},
{
"identifier": "kafkaTopic",
Expand Down Expand Up @@ -85,15 +85,70 @@
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
"target":"kafkaCluster",
"required":true,
"many":false
},
"brokers":{
"target":"kafkaBroker",
"required":false,
"many":true
}
}
},
{
"identifier":"kafkaConsumerGroup",
"title":"Consumer Group",
"icon":"Kafka",
"schema":{
"properties":{
"state":{
"title":"State",
"type":"string",
"description":"The current state of the consumer group."
},
"members":{
"title":"Members",
"type":"array",
"description":"List of members in the consumer group.",
"items":{
"type":"string"
}
},
"coordinator":{
"title":"Coordinator",
"type":"number",
"description":"Broker ID of the coordinator for the consumer group."
},
"partition_assignor":{
"title":"Partition Assignor",
"type":"string",
"description":"Strategy used to assign partitions to consumers."
},
"is_simple_consumer_group":{
"title":"Is Simple Consumer Group",
"type":"boolean",
"description":"Indicates if the group is a simple consumer group."
},
"authorized_operations":{
"title":"Authorized Operations",
"type":"array",
"description":"List of operations authorized for the consumer group.",
"items":{
"type":"string"
}
}
}
},
"calculationProperties":{

},
"relations":{
"cluster":{
"target":"kafkaCluster",
"required":true,
"many":false
}
}
}
]
18 changes: 18 additions & 0 deletions integrations/kafka/.port/resources/port-app-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,21 @@ resources:
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'
- kind: consumer_group
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .group_id
title: .group_id
blueprint: '"kafkaConsumerGroup"'
properties:
state: .state
members: '[.members[].client_id]'
coordinator: .coordinator.id
partition_assignor: .partition_assignor
is_simple_consumer_group: .is_simple_consumer_group
authorized_operations: .authorized_operations
relations:
cluster: .cluster_name
1 change: 1 addition & 0 deletions integrations/kafka/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ features:
- kind: cluster
- kind: broker
- kind: topic
- kind: consumer_group
saas:
enabled: false
configurations:
Expand Down
8 changes: 8 additions & 0 deletions integrations/kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.114 (2025-01-22)


### Improvements

- Added support for ingesting consumer groups


## 0.1.113 (2025-01-22)


Expand Down
36 changes: 36 additions & 0 deletions integrations/kafka/examples/consumer_groups.entity.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"blueprint": "kafkaConsumerGroup",
"identifier": "local-cluster_conduktor_gateway",
"createdAt": "2025-01-17T14:23:38.182Z",
"updatedBy": "<port-client-id>",
"createdBy": "<port-client-id>",
"icon": null,
"team": [],
"title": "conduktor_gateway",
"relations": {
"cluster": "local-cluster"
},
"properties": {
"partition_assignor": "range",
"coordinator": null,
"authorized_operations": null,
"is_simple_consumer_group": false,
"members": [
{
"assignment": {
"topic_partitions": [
{
"partition": 0,
"topic": "_conduktor_gateway_license"
}
]
},
"host": "172.23.0.6",
"id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56",
"client_id": "conduktor-gateway_6969"
}
],
"state": "STABLE"
},
"updatedAt": "2025-01-17T14:23:38.182Z"
}
24 changes: 24 additions & 0 deletions integrations/kafka/examples/consumer_groups.response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"group_id": "_confluent-ksql-default_query_CTAS_PURCHASE_PER_PRODUCT_0",
"state": "EMPTY",
"members": [
{
"assignment": {
"topic_partitions": [
{
"partition": 0,
"topic": "_conduktor_gateway_license"
}
]
},
"host": "172.23.0.6",
"id": "conduktor-gateway_6969-707254dd-d7d6-4e34-9ac9-d868d5fd5f56",
"client_id": "conduktor-gateway_6969"
}
],
"cluster_name": "local-cluster",
"coordinator": 0,
"partition_assignor": "range",
"is_simple_consumer_group": false,
"authorized_operations": null
}
57 changes: 57 additions & 0 deletions integrations/kafka/kafka_integration/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any
from anyio import to_thread

import confluent_kafka # type: ignore

Expand Down Expand Up @@ -86,3 +87,59 @@ def describe_topics(self) -> list[dict[str, Any]]:
logger.error(f"Failed to describe topic {topic_name}: {e}")
raise e
return result_topics

async def describe_consumer_groups(self) -> list[dict[str, Any]]:
"""Describe all consumer groups in the cluster."""
result_groups: list[dict[str, Any]] = []

# List all consumer groups and wait for the future to complete
groups_metadata = await to_thread.run_sync(
self.kafka_admin_client.list_consumer_groups
)
groups_result = await to_thread.run_sync(groups_metadata.result)
group_ids = [group.group_id for group in groups_result.valid]

logger.info(f"Found {len(group_ids)} consumer groups")
if not group_ids:
return result_groups

# Describe the consumer groups
groups_description = await to_thread.run_sync(
self.kafka_admin_client.describe_consumer_groups, group_ids
)

for group_id, future in groups_description.items():
try:
group_info = await to_thread.run_sync(future.result)
members = [
{
"id": member.member_id,
"client_id": member.client_id,
"host": member.host,
"assignment": {
"topic_partitions": [
{"topic": tp.topic, "partition": tp.partition}
for tp in member.assignment.topic_partitions
]
},
}
for member in group_info.members
]

result_groups.append(
{
"group_id": group_id,
"state": group_info.state.name,
"members": members,
"cluster_name": self.cluster_name,
"coordinator": group_info.coordinator.id,
"partition_assignor": group_info.partition_assignor,
"is_simple_consumer_group": group_info.is_simple_consumer_group,
"authorized_operations": group_info.authorized_operations,
}
)
except Exception as e:
logger.error(f"Failed to describe consumer group {group_id}: {e}")
raise e

return result_groups
7 changes: 7 additions & 0 deletions integrations/kafka/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ async def resync_topics(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield kafka_client.describe_topics()


@ocean.on_resync("consumer_group")
async def resync_consumer_groups(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
kafka_clients = init_clients()
for kafka_client in kafka_clients:
yield await kafka_client.describe_consumer_groups()
2 changes: 1 addition & 1 deletion integrations/kafka/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kafka"
version = "0.1.113"
version = "0.1.114"
description = "Integration to import information from a Kafka cluster into Port. The integration supports importing metadata regarding the Kafka cluster, brokers and topics."
authors = ["Tal Sabag <[email protected]>"]

Expand Down
Loading

0 comments on commit f54b514

Please sign in to comment.