Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InvalidReplicationFactorError is raised if reply_create_topic is set #76

Open
2 tasks done
ihor-rud opened this issue Jan 19, 2021 · 0 comments
Open
2 tasks done

Comments

@ihor-rud
Copy link

ihor-rud commented Jan 19, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Create faust app and set reply_create_topic=True flag.

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
)

greetings_topic = app.topic('greetings', value_type=str)


@app.agent(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)
        yield 'resp ' + greeting


@app.timer(5)
async def produce():
    for i in range(100):
        resp = await print_greetings.ask(value=f'hello {i}')
        print(resp)

if __name__== '__main__':
    app.main()

Expected behavior

Reply topic is created.

Actual behavior

InvalidReplicationFactorError is raised.

Full traceback

hello-word_1  | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9'
hello-word_1  | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.')
hello-word_1  | Traceback (most recent call last):
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
hello-word_1  |     await task
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped
hello-word_1  |     return await task()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer
hello-word_1  |     await fun(*args)
hello-word_1  |   File "/main.py", line 31, in produce
hello-word_1  |     resp = await print_greetings.ask(value=f'hello {i}')
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask
hello-word_1  |     await app._reply_consumer.add(p.correlation_id, p)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add
hello-word_1  |     await self._start_fetcher(reply_topic)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher
hello-word_1  |     await topic.maybe_declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare
hello-word_1  |     await self.declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare
hello-word_1  |     await producer.create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic
hello-word_1  |     await cast(Transport, self.transport)._create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic
hello-word_1  |     await wrap()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic
hello-word_1  |     raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
hello-word_1  | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.

Workaround

class MyTopic(faust.Topic):

    def __init__(self, *args, **kwargs):
        if kwargs.get('replicas') == 0:
            kwargs['replicas'] = 1
        super().__init__(*args, **kwargs)

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
    Topic=MyTopic
)

Proposed solution

I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1 or replicas=None will solve the problem

Versions

  • Python version 3.8
  • Faust version 0.4.1
  • Operating system
    tested on python:3.8 docker container
root@0d7a8b87eab1:/# cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
  • Kafka version bitnami/kafka:2.7.0
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
  • RocksDB version (if applicable) None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant