Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SASL connect fails with RuntimeError: await wasn't used with future #580

Open
galen211 opened this issue Feb 6, 2020 · 6 comments
Open
Milestone

Comments

@galen211
Copy link

galen211 commented Feb 6, 2020

I'm trying to connect to my Kafka cluster (confluent cloud) using a modified version of the ssl_consume_produce.py example from the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py.

I've configured my AIOKafkaConsumer and AIOKafkaProducer with the correct SASL config, but am getting the error RuntimeError: await wasn't used with future. I've included my config, error details, and the ssl_consume_produce.py example below.

config:

bootstrap.servers=*********.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="*********" password\="******************";
sasl.username=*********
sasl.password=*********

error logs:

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
[2020-02-06 17:26:57,060] DEBUG [asyncio]: Using selector: KqueueSelector
[2020-02-06 17:26:57,061] DEBUG [aiokafka.producer.producer]: Starting the Kafka producer
[2020-02-06 17:26:57,061] DEBUG [aiokafka]: Attempting to bootstrap via node at pkc-43n10.us-central1.gcp.confluent.cloud:9092
[2020-02-06 17:26:57,223] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 1: ApiVersionRequest_v0()
[2020-02-06 17:26:57,265] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=10000, min_version=0, max_version=0)])
[2020-02-06 17:26:57,266] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
[2020-02-06 17:26:57,303] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 78, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 73, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 34, in produce_and_consume
    start_future = await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
[2020-02-06 17:26:57,315] ERROR [asyncio]: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fcee03a84d0>

Process finished with exit code 1

ssl_consume_produce.py:

import asyncio
import os
import logging

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from aiokafka.errors import KafkaError

from aiokafka import AIOKafkaClient

import ccloud_lib

conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
ssl_context = create_ssl_context(cafile='cacert.pem')

log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        ssl_context=ssl_context,
        sasl_plain_username=conf['sasl.username'],
        sasl_plain_password=conf['sasl.password'],
        api_version='0.10'
    )
    try:
        start_future = await producer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))

    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    try:
        start_future = await consumer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))


    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass
@galen211 galen211 changed the title Unable to connect to confluent cloud SASL connect fails with RuntimeError: await wasn't used with future Feb 7, 2020
@tvoinarovskyi tvoinarovskyi added this to the 0.6.1 milestone May 11, 2020
@tvoinarovskyi
Copy link
Member

Sorry, it was probably some version of an unstable master at the time. =( I did make a mistake there in 0.5.2 I think. Should be fixed in 0.6.0.

@offero
Copy link

offero commented Dec 9, 2021

We are still seeing this issue in 0.7.2.

@ods
Copy link
Collaborator

ods commented Dec 11, 2021

We are still seeing this issue in 0.7.2.

The example here doesn't look like working code, neither AIOKafkaConsumer.start() nor AIOKafkaProducer.start() return future. And it would be very helpful if you provide a self-contained example to reproduce the problem, e.g. python script + Dockerfile + docker-compose.yml

@offero
Copy link

offero commented Dec 13, 2021

We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).

@ods
Copy link
Collaborator

ods commented Dec 14, 2021

We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130).

Could you then try it with current code in master please? It uses async-timeout as a replacement for wait_for().

@offero
Copy link

offero commented Dec 15, 2021

It got past the previous error, but it still errors with:

...line 45, in start_consumer
    await consumer.start()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py\", line 346, in start
    await self._client.bootstrap()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/client.py\", line 210, in bootstrap
    bootstrap_conn = await create_conn(
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 97, in create_conn
    await conn.connect()
  File \"/home/user/workspace/venvs/myenv/lib/python3.8/site-packages/aiokafka/conn.py\", line 217, in connect
    transport, _ = await loop.create_connection(
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 986, in create_connection
    infos = await self._ensure_resolved(
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 1365, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
  File \"/usr/lib64/python3.8/asyncio/base_events.py\", line 825, in getaddrinfo
    return await self.run_in_executor(
builtins.RuntimeError: await wasn't used with future
"}


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants