Skip to content

Commit

Permalink
Fix error messages in faust app #166
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Dec 13, 2021
1 parent a86350b commit 28d0db2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
6 changes: 4 additions & 2 deletions faust/_cython/streams.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ cdef class StreamIterator:
offset = message.offset
consumer = self.consumer

if not self.app.flow_control.is_active() or\
message.generation_id != self.app.consumer_generation_id:
if (
not self.app.flow_control.is_active()
or message.generation_id != self.app.consumer_generation_id
):
self.app.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r flow control.is_active %r",
Expand Down
8 changes: 5 additions & 3 deletions faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,16 +974,18 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
tp = message.tp
offset = message.offset

if not self.app.flow_control.is_active() or\
message.generation_id != self.app.consumer_generation_id:
if (
not self.app.flow_control.is_active()
or message.generation_id != self.app.consumer_generation_id
):
value = skipped_value
self.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r flow_control.is_active %r",
message,
message.generation_id,
self.app.consumer_generation_id,
self.app.flow_control.is_active()
self.app.flow_control.is_active(),
)
break
if topic in acking_topics and not message.tracked:
Expand Down

0 comments on commit 28d0db2

Please sign in to comment.