Skip to content

Commit

Permalink
Bumping version to 0.2.2rc6
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Nov 24, 2020
1 parent 2f81086 commit c7974e5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 0.2.2
### Fixed
- Consumer offsets not progressing for certain partitions
- Agent dies silenty when mode cancels pending tasks
- Agent dies silenty when mode cancels pending tasks [678](https://github.com/robinhood/faust/issues/678)

## 0.2.1

Expand Down
2 changes: 1 addition & 1 deletion faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import typing
from typing import Any, Mapping, NamedTuple, Optional, Sequence, Tuple

__version__ = "0.2.2"
__version__ = "0.2.2rc6"
__author__ = "Robinhood Markets, Inc."
__contact__ = "[email protected], [email protected]"
__homepage__ = "https://github.com/faust-streaming/faust"
Expand Down
9 changes: 6 additions & 3 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]:
return None

async def _wait(self, coro: WaitArgT, timeout: int = None) -> None:
signal = self.signal_recovery_start.wait()
signal = self.signal_recovery_start
wait_result = await self.wait_first(coro, signal, timeout=timeout)
if wait_result.stopped:
# service was stopped.
Expand Down Expand Up @@ -737,7 +737,7 @@ def _maybe_signal_recovery_end() -> None:
message = event.message
tp = message.tp
offset = message.offset

logger.debug(f"Recovery message topic {tp} offset {offset}")
offsets: Counter[TP]
bufsize = buffer_sizes.get(tp)
is_active = False
Expand All @@ -755,9 +755,12 @@ def _maybe_signal_recovery_end() -> None:
bufsize = buffer_sizes[tp] = table.standby_buffer_size
standby_events_received_at[tp] = now
else:
continue
logger.warning(f"recovery unknown topic {tp} offset {offset}")

seen_offset = offsets.get(tp, None)
logger.debug(
f"seen offset for {tp} is {seen_offset} message offset {offset}"
)
if seen_offset is None or offset > seen_offset:
offsets[tp] = offset
buf = buffers[table]
Expand Down
30 changes: 19 additions & 11 deletions faust/transport/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def on_message(message: Message) -> None:
# so that if a DecodeError is raised we can propagate
# that error to the remaining channels.
delivered: Set[_Topic] = set()
full: typing.List[Tuple[EventT, _Topic]] = []
try:
for chan in channels:
keyid = chan.key_type, chan.value_type
Expand All @@ -133,11 +134,10 @@ async def on_message(message: Message) -> None:
event_keyid = keyid

queue = chan.queue
queue.put_nowait_enhanced(
event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():

This comment has been minimized.

Copy link
@forsberg

forsberg Nov 24, 2020

Contributor

Hmm.. this does synchronize behaviour with the Cython version, but it also changes how things work since the on_buffer_full/on_pressure_drop functions in consumer won't be called anymore - I'm unsure on the effects of this.

Perhaps it's better to port the put_nowait_enhanced variant to Cython?

full.append((event, chan))
continue
queue.put_nowait(event)
else:
# subsequent channels may have a different
# key/value type pair, meaning they all can
Expand All @@ -150,13 +150,21 @@ async def on_message(message: Message) -> None:
else:
dest_event = await chan.decode(message, propagate=True)
queue = chan.queue
queue.put_nowait_enhanced(
dest_event,
on_pressure_high=on_pressure_high,
on_pressure_drop=on_pressure_drop,
)
if queue.full():
full.append((dest_event, chan))
continue
queue.put_nowait(dest_event)
delivered.add(chan)

if full:
for _, dest_chan in full:
on_topic_buffer_full(dest_chan)
await asyncio.wait(
[
dest_chan.put(dest_event)
for dest_event, dest_chan in full
],
return_when=asyncio.ALL_COMPLETED,
)
except KeyDecodeError as exc:
remaining = channels - delivered
message.ack(app.consumer, n=len(remaining))
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ def _new_offset(self, tp: TP) -> Optional[int]:
acked[: len(batch)] = []
self._acked_index[tp].difference_update(batch)
# return the highest commit offset
return batch[-1] + 1
return batch[-1]
return None

async def on_task_error(self, exc: BaseException) -> None:
Expand Down

0 comments on commit c7974e5

Please sign in to comment.