Skip to content

Commit

Permalink
fix race condition when buffers are full
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Dec 13, 2021
1 parent 9a298ac commit 891ef26
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
3 changes: 2 additions & 1 deletion faust/_cython/streams.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ cdef class StreamIterator:
offset = message.offset
consumer = self.consumer

if message.generation_id != self.app.consumer_generation_id:
if not self.app.flow_control.is_active() or\
message.generation_id != self.app.consumer_generation_id:
self.app.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r",
Expand Down
3 changes: 2 additions & 1 deletion faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,8 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
tp = message.tp
offset = message.offset

if message.generation_id != self.app.consumer_generation_id:
if not self.app.flow_control.is_active() or\
message.generation_id != self.app.consumer_generation_id:
value = skipped_value
self.log.dev(
"Skipping message %r with generation_id %r because "
Expand Down

0 comments on commit 891ef26

Please sign in to comment.