diff --git a/faust/_cython/streams.pyx b/faust/_cython/streams.pyx index 6a49a6800..4e8952d51 100644 --- a/faust/_cython/streams.pyx +++ b/faust/_cython/streams.pyx @@ -160,8 +160,10 @@ cdef class StreamIterator: offset = message.offset consumer = self.consumer - if not self.app.flow_control.is_active() or\ - message.generation_id != self.app.consumer_generation_id: + if ( + not self.app.flow_control.is_active() + or message.generation_id != self.app.consumer_generation_id + ): self.app.log.dev( "Skipping message %r with generation_id %r because " "app generation_id is %r flow control.is_active %r", diff --git a/faust/streams.py b/faust/streams.py index c6e2335a8..dfbc2a3a9 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -974,8 +974,10 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: tp = message.tp offset = message.offset - if not self.app.flow_control.is_active() or\ - message.generation_id != self.app.consumer_generation_id: + if ( + not self.app.flow_control.is_active() + or message.generation_id != self.app.consumer_generation_id + ): value = skipped_value self.log.dev( "Skipping message %r with generation_id %r because " @@ -983,7 +985,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: message, message.generation_id, self.app.consumer_generation_id, - self.app.flow_control.is_active() + self.app.flow_control.is_active(), ) break if topic in acking_topics and not message.tracked: