Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bumping version to 0.2.2rc6 #45

Merged
merged 2 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 0.2.2
### Fixed
- Consumer offsets not progressing for certain partitions
- Agent dies silenty when mode cancels pending tasks
- Agent dies silenty when mode cancels pending tasks [678](https://github.com/robinhood/faust/issues/678)

## 0.2.1

Expand Down
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.2.2"
__version__ = "0.2.2rc6"
__author__ = "Robinhood Markets, Inc."
__contact__ = "[email protected], [email protected]"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
9 changes: 6 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]:
return None

async def _wait(self, coro: WaitArgT, timeout: int = None) -> None:
signal = self.signal_recovery_start.wait()
signal = self.signal_recovery_start
wait_result = await self.wait_first(coro, signal, timeout=timeout)
if wait_result.stopped:
# service was stopped.
Expand Down Expand Up @@ -737,7 +737,7 @@ def _maybe_signal_recovery_end() -> None:
message = event.message
tp = message.tp
offset = message.offset

logger.debug(f"Recovery message topic {tp} offset {offset}")
offsets: Counter[TP]
bufsize = buffer_sizes.get(tp)
is_active = False
Expand All @@ -755,9 +755,12 @@ def _maybe_signal_recovery_end() -> None:
bufsize = buffer_sizes[tp] = table.standby_buffer_size
standby_events_received_at[tp] = now
else:
continue
logger.warning(f"recovery unknown topic {tp} offset {offset}")

seen_offset = offsets.get(tp, None)
logger.debug(
f"seen offset for {tp} is {seen_offset} message offset {offset}"
)
if seen_offset is None or offset > seen_offset:
offsets[tp] = offset
buf = buffers[table]
Expand Down
30 changes: 19 additions & 11 deletions faust/transport/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def on_message(message: Message) -> None:
# so that if a DecodeError is raised we can propagate
# that error to the remaining channels.
delivered: Set[_Topic] = set()
full: typing.List[Tuple[EventT, _Topic]] = []
try:
for chan in channels:
keyid = chan.key_type, chan.value_type
Expand All @@ -133,11 +134,10 @@ async def on_message(message: Message) -> None:
event_keyid = keyid

queue = chan.queue
queue.put_nowait_enhanced(
event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():
full.append((event, chan))
continue
queue.put_nowait(event)
else:
# subsequent channels may have a different
# key/value type pair, meaning they all can
Expand All @@ -150,13 +150,21 @@ async def on_message(message: Message) -> None:
else:
dest_event = await chan.decode(message, propagate=True)
queue = chan.queue
queue.put_nowait_enhanced(
dest_event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():
full.append((dest_event, chan))
continue
queue.put_nowait(dest_event)
delivered.add(chan)

if full:
for _, dest_chan in full:
on_topic_buffer_full(dest_chan)
await asyncio.wait(
[
dest_chan.put(dest_event)
for dest_event, dest_chan in full
],
return_when=asyncio.ALL_COMPLETED,
)
except KeyDecodeError as exc:
remaining = channels - delivered
message.ack(app.consumer, n=len(remaining))
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked[: len(batch)] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1] + 1
return batch[-1]
return None

async def on_task_error(self, exc: BaseException) -> None:
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ async def _fetch_records(
max_records=max_records,
)
finally:
fetcher._fetch_waiters.clear()
pass

async def create_topic(
self,
Expand Down