Skip to content

Commit

Permalink
fix race condition when buffers are full
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Dec 13, 2021
1 parent 891ef26 commit a86350b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
3 changes: 2 additions & 1 deletion faust/_cython/streams.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ cdef class StreamIterator:
message.generation_id != self.app.consumer_generation_id:
self.app.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r",
"app generation_id is %r flow control.is_active %r",
message,
message.generation_id,
self.app.consumer_generation_id,
self.app.flow_control.is_active()
)
return None, self._skipped_value, stream_state

Expand Down
3 changes: 2 additions & 1 deletion faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,10 +979,11 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
value = skipped_value
self.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r",
"app generation_id is %r flow_control.is_active %r",
message,
message.generation_id,
self.app.consumer_generation_id,
self.app.flow_control.is_active()
)
break
if topic in acking_topics and not message.tracked:
Expand Down

0 comments on commit a86350b

Please sign in to comment.