Skip to content

Commit

Permalink
Bumping version to 0.2.2rc6 (#45)
Browse files Browse the repository at this point in the history
* Bumping version to 0.2.2rc6

* Fixing the errors message when no message is received on a stream for 1 second
  • Loading branch information
patkivikram authored Nov 24, 2020
1 parent 2f81086 commit 136b781
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 0.2.2
### Fixed
- Consumer offsets not progressing for certain partitions
- Agent dies silenty when mode cancels pending tasks
- Agent dies silenty when mode cancels pending tasks [678](https://github.com/robinhood/faust/issues/678)

## 0.2.1

Expand Down
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.2.2"
__version__ = "0.2.2rc6"
__author__ = "Robinhood Markets, Inc."
__contact__ = "[email protected], [email protected]"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
9 changes: 6 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]:
return None

async def _wait(self, coro: WaitArgT, timeout: int = None) -> None:
signal = self.signal_recovery_start.wait()
signal = self.signal_recovery_start
wait_result = await self.wait_first(coro, signal, timeout=timeout)
if wait_result.stopped:
# service was stopped.
Expand Down Expand Up @@ -737,7 +737,7 @@ def _maybe_signal_recovery_end() -> None:
message = event.message
tp = message.tp
offset = message.offset

logger.debug(f"Recovery message topic {tp} offset {offset}")
offsets: Counter[TP]
bufsize = buffer_sizes.get(tp)
is_active = False
Expand All @@ -755,9 +755,12 @@ def _maybe_signal_recovery_end() -> None:
bufsize = buffer_sizes[tp] = table.standby_buffer_size
standby_events_received_at[tp] = now
else:
continue
logger.warning(f"recovery unknown topic {tp} offset {offset}")

seen_offset = offsets.get(tp, None)
logger.debug(
f"seen offset for {tp} is {seen_offset} message offset {offset}"
)
if seen_offset is None or offset > seen_offset:
offsets[tp] = offset
buf = buffers[table]
Expand Down
30 changes: 19 additions & 11 deletions faust/transport/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def on_message(message: Message) -> None:
# so that if a DecodeError is raised we can propagate
# that error to the remaining channels.
delivered: Set[_Topic] = set()
full: typing.List[Tuple[EventT, _Topic]] = []
try:
for chan in channels:
keyid = chan.key_type, chan.value_type
Expand All @@ -133,11 +134,10 @@ async def on_message(message: Message) -> None:
event_keyid = keyid

queue = chan.queue
queue.put_nowait_enhanced(
event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():
full.append((event, chan))
continue
queue.put_nowait(event)
else:
# subsequent channels may have a different
# key/value type pair, meaning they all can
Expand All @@ -150,13 +150,21 @@ async def on_message(message: Message) -> None:
else:
dest_event = await chan.decode(message, propagate=True)
queue = chan.queue
queue.put_nowait_enhanced(
dest_event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():
full.append((dest_event, chan))
continue
queue.put_nowait(dest_event)
delivered.add(chan)

if full:
for _, dest_chan in full:
on_topic_buffer_full(dest_chan)
await asyncio.wait(
[
dest_chan.put(dest_event)
for dest_event, dest_chan in full
],
return_when=asyncio.ALL_COMPLETED,
)
except KeyDecodeError as exc:
remaining = channels - delivered
message.ack(app.consumer, n=len(remaining))
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked[: len(batch)] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1] + 1
return batch[-1]

This comment has been minimized.

Copy link
@jkgenser

jkgenser Nov 29, 2020

If I restart a worker, the agent always consumes the most recent record that was acked. If I revert this change, then that no longer happens. So this change seems to have introduced an error where the consumer is not acknowledging the most recent message that was processed

return None

async def on_task_error(self, exc: BaseException) -> None:
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ async def _fetch_records(
max_records=max_records,
)
finally:
fetcher._fetch_waiters.clear()
pass

async def create_topic(
self,
Expand Down

0 comments on commit 136b781

Please sign in to comment.