diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py index e135fd33f6..5a5d1f8af9 100644 --- a/streampipes-client-python/setup.py +++ b/streampipes-client-python/setup.py @@ -35,6 +35,7 @@ "pydantic>=1.10.2", "requests>=2.28.1", "nats-py>=2.2.0", + "confluent-kafka>=2.0.2" ] dev_packages = base_packages + [ diff --git a/streampipes-client-python/streampipes/functions/broker/__init__.py b/streampipes-client-python/streampipes/functions/broker/__init__.py index 45ffbdc7c3..32d35fba3b 100644 --- a/streampipes-client-python/streampipes/functions/broker/__init__.py +++ b/streampipes-client-python/streampipes/functions/broker/__init__.py @@ -15,12 +15,14 @@ # limitations under the License. # from .broker import Broker +from .kafka_broker import KafkaBroker from .nats_broker import NatsBroker from .broker_handler import SupportedBroker, get_broker # isort: skip __all__ = [ "Broker", + "KafkaBroker", "NatsBroker", "SupportedBroker", "get_broker", diff --git a/streampipes-client-python/streampipes/functions/broker/broker_handler.py b/streampipes-client-python/streampipes/functions/broker/broker_handler.py index 2c05b839d7..3aac0f9865 100644 --- a/streampipes-client-python/streampipes/functions/broker/broker_handler.py +++ b/streampipes-client-python/streampipes/functions/broker/broker_handler.py @@ -16,7 +16,7 @@ # from enum import Enum -from streampipes.functions.broker import Broker, NatsBroker +from streampipes.functions.broker import Broker, KafkaBroker, NatsBroker from streampipes.model.resource.data_stream import DataStream @@ -24,6 +24,7 @@ class SupportedBroker(Enum): """Enum for the supported brokers.""" NATS = "NatsTransportProtocol" + KAFKA = "KafkaTransportProtocol" # TODO Exception should be removed once all brokers are implemented. @@ -49,5 +50,7 @@ def get_broker(data_stream: DataStream) -> Broker: # TODO implementation for mo broker_name = data_stream.event_grounding.transport_protocols[0].class_name if SupportedBroker.NATS.value in broker_name: return NatsBroker() + elif SupportedBroker.KAFKA.value in broker_name: + return KafkaBroker() else: raise UnsupportedBroker(f'The python client doesn\'t include the broker: "{broker_name}" yet') diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_broker.py b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py new file mode 100644 index 0000000000..900d7f6c15 --- /dev/null +++ b/streampipes-client-python/streampipes/functions/broker/kafka_broker.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from typing import Any, AsyncIterator, Dict + +from confluent_kafka import Consumer # type: ignore +from streampipes.functions.broker.broker import Broker +from streampipes.functions.broker.kafka_message_fetcher import KafkaMessageFetcher +from streampipes.model.common import random_letters + +logger = logging.getLogger(__name__) + + +class KafkaBroker(Broker): + """Implementation of the NatsBroker""" + + async def _makeConnection(self, hostname: str, port: int) -> None: + """Helper function to connect to a server. + + Parameters + ---------- + + hostname: str + The hostname of the of the server, which the broker connects to. + + port: int + The port number of the connection. + + Returns + ------- + None + """ + self.kafka_consumer = Consumer( + {"bootstrap.servers": f"{hostname}:{port}", "group.id": random_letters(6), "auto.offset.reset": "latest"} + ) + + async def createSubscription(self) -> None: + """Creates a subscription to a data stream. + + Returns + ------- + None + """ + self.kafka_consumer.subscribe([self.topic_name]) + + logger.info(f"Subscribed to stream: {self.stream_id}") + + async def publish_event(self, event: Dict[str, Any]): + """Publish an event to a connected data stream. + + Parameters + ---------- + event: Dict[str, Any] + The event to be published. + + Returns + ------- + None + """ + + # await self.publish(subject=self.topic_name, payload=json.dumps(event).encode("utf-8")) + + async def disconnect(self) -> None: + """Closes the connection to the server. + + Returns + ------- + None + """ + self.kafka_consumer.close() + logger.info(f"Stopped connection to stream: {self.stream_id}") + + def get_message(self) -> AsyncIterator: + """Get the published messages of the subscription. + + Returns + ------- + An async iterator for the messages. + """ + + return KafkaMessageFetcher(self.kafka_consumer) diff --git a/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py new file mode 100644 index 0000000000..56bbf46c9f --- /dev/null +++ b/streampipes-client-python/streampipes/functions/broker/kafka_message_fetcher.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from confluent_kafka import Consumer # type: ignore + + +class KafkaMsg: + """An internal representation of a Kafka message + + Parameters + ---------- + data: Byte Array + The received Kafka message as byte array + """ + + def __init__(self, data): + self.data = data + + +class KafkaMessageFetcher: + """Fetches the next message from Kafka + + Parameters + ---------- + consumer: Consumer + The Kafka consumer + """ + + def __init__(self, consumer: Consumer): + self.consumer = consumer + + def __aiter__(self): + return self + + async def __anext__(self): + msg = self.consumer.poll(0.1) + return KafkaMsg(msg.value()) diff --git a/streampipes-client-python/streampipes/model/common.py b/streampipes-client-python/streampipes/model/common.py index edc24b955d..a6c21aa84c 100644 --- a/streampipes-client-python/streampipes/model/common.py +++ b/streampipes-client-python/streampipes/model/common.py @@ -93,8 +93,8 @@ class EventProperty(BasicModel): description: Optional[StrictStr] runtime_name: StrictStr required: StrictBool = Field(default=False) - domain_properties: List[StrictStr] = Field(default_factory=list) - property_scope: StrictStr = Field(default="MEASUREMENT_PROPERTY") + domain_properties: Optional[List[StrictStr]] = Field(default_factory=list) + property_scope: Optional[StrictStr] = Field(default="MEASUREMENT_PROPERTY") index: StrictInt = Field(default=0) runtime_id: Optional[StrictStr] runtime_type: StrictStr = Field(default="http://www.w3.org/2001/XMLSchema#string") diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py index 1dcbe3a4a6..d72732afa3 100644 --- a/streampipes-client-python/streampipes/model/resource/data_lake_measure.py +++ b/streampipes-client-python/streampipes/model/resource/data_lake_measure.py @@ -52,4 +52,4 @@ def convert_to_pandas_representation(self): pipeline_id: Optional[StrictStr] pipeline_name: Optional[StrictStr] pipeline_is_running: StrictBool - schema_version: StrictStr + schema_version: Optional[StrictStr]