We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
master
Create faust app and set reply_create_topic=True flag.
app = faust.App( 'hello-world', broker='kafka://kafka:9092', reply_create_topic=True, ) greetings_topic = app.topic('greetings', value_type=str) @app.agent(greetings_topic) async def print_greetings(greetings): async for greeting in greetings: print(greeting) yield 'resp ' + greeting @app.timer(5) async def produce(): for i in range(100): resp = await print_greetings.ask(value=f'hello {i}') print(resp) if __name__== '__main__': app.main()
Reply topic is created.
InvalidReplicationFactorError is raised.
hello-word_1 | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9' hello-word_1 | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.') hello-word_1 | Traceback (most recent call last): hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task hello-word_1 | await task hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped hello-word_1 | return await task() hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer hello-word_1 | await fun(*args) hello-word_1 | File "/main.py", line 31, in produce hello-word_1 | resp = await print_greetings.ask(value=f'hello {i}') hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask hello-word_1 | await app._reply_consumer.add(p.correlation_id, p) hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add hello-word_1 | await self._start_fetcher(reply_topic) hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher hello-word_1 | await topic.maybe_declare() hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__ hello-word_1 | result = await self.fun(*self.args, **self.kwargs) hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare hello-word_1 | await self.declare() hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare hello-word_1 | await producer.create_topic( hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic hello-word_1 | await cast(Transport, self.transport)._create_topic( hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic hello-word_1 | await wrap() hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__ hello-word_1 | result = await self.fun(*self.args, **self.kwargs) hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic hello-word_1 | raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}") hello-word_1 | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.
class MyTopic(faust.Topic): def __init__(self, *args, **kwargs): if kwargs.get('replicas') == 0: kwargs['replicas'] = 1 super().__init__(*args, **kwargs) app = faust.App( 'hello-world', broker='kafka://kafka:9092', reply_create_topic=True, Topic=MyTopic )
I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1 or replicas=None will solve the problem
replicas=1
replicas=None
root@0d7a8b87eab1:/# cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 10 (buster)" NAME="Debian GNU/Linux" VERSION_ID="10" VERSION="10 (buster)" VERSION_CODENAME=buster ID=debian
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version 2.7.0 (Commit:448719dc99a19793)
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Checklist
master
branch of Faust.Steps to reproduce
Create faust app and set reply_create_topic=True flag.
Expected behavior
Reply topic is created.
Actual behavior
InvalidReplicationFactorError is raised.
Full traceback
Workaround
Proposed solution
I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to
replicas=1
orreplicas=None
will solve the problemVersions
tested on python:3.8 docker container
The text was updated successfully, but these errors were encountered: