From cd3ce7da7d4c4937f3fe2667dfdebc499108b917 Mon Sep 17 00:00:00 2001 From: Cuong Pham Date: Mon, 16 Dec 2024 15:01:20 +0100 Subject: [PATCH] Adding streaming scripts --- stream_processing/kafka/readme.md | 3 --- stream_processing/scripts/json_consume_message.py | 3 ++- stream_processing/scripts/window_datastream_api.py | 4 +++- 3 files changed, 5 insertions(+), 5 deletions(-) delete mode 100644 stream_processing/kafka/readme.md diff --git a/stream_processing/kafka/readme.md b/stream_processing/kafka/readme.md deleted file mode 100644 index 4542cdd..0000000 --- a/stream_processing/kafka/readme.md +++ /dev/null @@ -1,3 +0,0 @@ -docker build -t nyc_producer:latest . -docker image tag nyc_producer:latest luongphambao/nyc_producer:latest -docker push luongphambao/nyc_producer:latest \ No newline at end of file diff --git a/stream_processing/scripts/json_consume_message.py b/stream_processing/scripts/json_consume_message.py index 5f8b85a..9c5fd07 100644 --- a/stream_processing/scripts/json_consume_message.py +++ b/stream_processing/scripts/json_consume_message.py @@ -26,7 +26,7 @@ def main(): consumer.subscribe([topic]) - # Read messages from Kafka + # # Read messages from Kafka try: while True: # Wait for up to 1 second for new messages to arrive @@ -44,6 +44,7 @@ def main(): except: value = msg.value().decode("utf-8") print(f"Received message: {value}") + break except KeyboardInterrupt: print("Aborted by user!\n") diff --git a/stream_processing/scripts/window_datastream_api.py b/stream_processing/scripts/window_datastream_api.py index b0a7642..03de865 100644 --- a/stream_processing/scripts/window_datastream_api.py +++ b/stream_processing/scripts/window_datastream_api.py @@ -46,6 +46,7 @@ def process( context: ProcessWindowFunction.Context[TimeWindow], elements: Iterable[tuple], ) -> Iterable[tuple]: + data_list = [] total_amounts = 0 passenger_counts = 0 @@ -75,7 +76,8 @@ def process( if __name__ == "__main__": - JARS_PATH = f"jars" + JARS_PATH = f"{os.getcwd()}/jars/" + print(JARS_PATH) servers = "localhost:9092" producer = KafkaProducer(bootstrap_servers=servers) admin_client = KafkaAdminClient(bootstrap_servers=servers)