From c7974e52ff82467e879e66733e0f66b799bf5f33 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 24 Nov 2020 12:26:30 -0500 Subject: [PATCH 1/2] Bumping version to 0.2.2rc6 --- CHANGELOG.md | 2 +- faust/__init__.py | 2 +- faust/tables/recovery.py | 9 ++++++--- faust/transport/conductor.py | 30 +++++++++++++++++++----------- faust/transport/consumer.py | 2 +- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07e38ba72..e91381344 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 0.2.2 ### Fixed - Consumer offsets not progressing for certain partitions -- Agent dies silenty when mode cancels pending tasks +- Agent dies silenty when mode cancels pending tasks [678](https://github.com/robinhood/faust/issues/678) ## 0.2.1 diff --git a/faust/__init__.py b/faust/__init__.py index cc4f6096d..1009abfaf 100644 --- a/faust/__init__.py +++ b/faust/__init__.py @@ -23,7 +23,7 @@ import typing from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple -__version__ = "0.2.2" +__version__ = "0.2.2rc6" __author__ = "Robinhood Markets, Inc." __contact__ = "schrohm@gmail.com, vpatki@wayfair.com" __homepage__ = "https://github.com/faust-streaming/faust" diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index a01f96082..ea03d3214 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -552,7 +552,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]: return None async def _wait(self, coro: WaitArgT, timeout: int = None) -> None: - signal = self.signal_recovery_start.wait() + signal = self.signal_recovery_start wait_result = await self.wait_first(coro, signal, timeout=timeout) if wait_result.stopped: # service was stopped. @@ -737,7 +737,7 @@ def _maybe_signal_recovery_end() -> None: message = event.message tp = message.tp offset = message.offset - + logger.debug(f"Recovery message topic {tp} offset {offset}") offsets: Counter[TP] bufsize = buffer_sizes.get(tp) is_active = False @@ -755,9 +755,12 @@ def _maybe_signal_recovery_end() -> None: bufsize = buffer_sizes[tp] = table.standby_buffer_size standby_events_received_at[tp] = now else: - continue + logger.warning(f"recovery unknown topic {tp} offset {offset}") seen_offset = offsets.get(tp, None) + logger.debug( + f"seen offset for {tp} is {seen_offset} message offset {offset}" + ) if seen_offset is None or offset > seen_offset: offsets[tp] = offset buf = buffers[table] diff --git a/faust/transport/conductor.py b/faust/transport/conductor.py index 74b5a7346..ba24bee5f 100644 --- a/faust/transport/conductor.py +++ b/faust/transport/conductor.py @@ -124,6 +124,7 @@ async def on_message(message: Message) -> None: # so that if a DecodeError is raised we can propagate # that error to the remaining channels. delivered: Set[_Topic] = set() + full: typing.List[Tuple[EventT, _Topic]] = [] try: for chan in channels: keyid = chan.key_type, chan.value_type @@ -133,11 +134,10 @@ async def on_message(message: Message) -> None: event_keyid = keyid queue = chan.queue - queue.put_nowait_enhanced( - event, - on_pressure_high=on_pressure_high, - on_pressure_drop=on_pressure_drop, - ) + if queue.full(): + full.append((event, chan)) + continue + queue.put_nowait(event) else: # subsequent channels may have a different # key/value type pair, meaning they all can @@ -150,13 +150,21 @@ async def on_message(message: Message) -> None: else: dest_event = await chan.decode(message, propagate=True) queue = chan.queue - queue.put_nowait_enhanced( - dest_event, - on_pressure_high=on_pressure_high, - on_pressure_drop=on_pressure_drop, - ) + if queue.full(): + full.append((dest_event, chan)) + continue + queue.put_nowait(dest_event) delivered.add(chan) - + if full: + for _, dest_chan in full: + on_topic_buffer_full(dest_chan) + await asyncio.wait( + [ + dest_chan.put(dest_event) + for dest_event, dest_chan in full + ], + return_when=asyncio.ALL_COMPLETED, + ) except KeyDecodeError as exc: remaining = channels - delivered message.ack(app.consumer, n=len(remaining)) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 00410d929..6b8be6e43 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -1031,7 +1031,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: acked[: len(batch)] = [] self._acked_index[tp].difference_update(batch) # return the highest commit offset - return batch[-1] + 1 + return batch[-1] return None async def on_task_error(self, exc: BaseException) -> None: From dce1331ad39533905b97f82f80bdd3f58a741739 Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Tue, 24 Nov 2020 15:56:44 -0500 Subject: [PATCH 2/2] Fixing the errors message when no message is received on a stream for 1 second --- faust/transport/drivers/aiokafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index e2ef0b901..c23801f73 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -820,7 +820,7 @@ async def _fetch_records( max_records=max_records, ) finally: - fetcher._fetch_waiters.clear() + pass async def create_topic( self,