Skip to content

Commit

Permalink
Adding streaming scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
giacuong171 committed Dec 16, 2024
1 parent b9ef5a3 commit cd3ce7d
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
3 changes: 0 additions & 3 deletions stream_processing/kafka/readme.md

This file was deleted.

3 changes: 2 additions & 1 deletion stream_processing/scripts/json_consume_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main():

consumer.subscribe([topic])

# Read messages from Kafka
# # Read messages from Kafka
try:
while True:
# Wait for up to 1 second for new messages to arrive
Expand All @@ -44,6 +44,7 @@ def main():
except:
value = msg.value().decode("utf-8")
print(f"Received message: {value}")
break
except KeyboardInterrupt:
print("Aborted by user!\n")

Expand Down
4 changes: 3 additions & 1 deletion stream_processing/scripts/window_datastream_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def process(
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple],
) -> Iterable[tuple]:

data_list = []
total_amounts = 0
passenger_counts = 0
Expand Down Expand Up @@ -75,7 +76,8 @@ def process(


if __name__ == "__main__":
JARS_PATH = f"jars"
JARS_PATH = f"{os.getcwd()}/jars/"
print(JARS_PATH)
servers = "localhost:9092"
producer = KafkaProducer(bootstrap_servers=servers)
admin_client = KafkaAdminClient(bootstrap_servers=servers)
Expand Down

0 comments on commit cd3ce7d

Please sign in to comment.