Skip to content

Commit

Permalink
Merge pull request #637 from 42-julien451/master
Browse files Browse the repository at this point in the history
Tweak offset to voice the last messages only
  • Loading branch information
alexandregv authored Dec 21, 2023
2 parents f6b0040 + 7f74623 commit ec7bef3
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions porte.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ def create_consumer(kafka_servers, topic, group_id, username, password):
consumer = Consumer(conf)

# Subscribe to the topic
consumer.subscribe(topic)

def my_assign (consumer, partitions):
for p in partitions:
p.offset = OFFSET_END
print('assign', partitions)
consumer.assign(partitions)
# Subscribe to the topic
consumer.subscribe(topic, on_assign=my_assign)
return consumer

def consume_messages(consumer, building, welcome, goodbye):
Expand Down

0 comments on commit ec7bef3

Please sign in to comment.