From b06e5799653d75021aa8a409c3d3e6f83a266f1f Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 6 Jul 2021 13:27:02 -0400 Subject: [PATCH] Fix error messages in faust app #166 --- faust/transport/drivers/aiokafka.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index be922a85f..dde5a6b18 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -722,8 +722,10 @@ def verify_event_path(self, now: float, tp: TP) -> None: secs_since_started = now - self.time_started if monitor is not None: # need for .stream_inbound_time - highwater = self.highwater(tp) - committed_offset = parent._committed_offset.get(tp) + aiotp = TopicPartition(tp.topic, tp.partition) + tp_state = self._ensure_consumer()._fetcher._subscriptions.subscription.assignment.state_value(aiotp) + highwater = tp_state.highwater + committed_offset = tp_state.position has_acks = acks_enabled_for(tp.topic) if highwater is None: if secs_since_started >= self.tp_stream_timeout_secs: