-
Notifications
You must be signed in to change notification settings - Fork 233
/
Copy pathssl_consume_produce.py
40 lines (34 loc) · 1.22 KB
/
ssl_consume_produce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from aiokafka.structs import TopicPartition
context = create_ssl_context(
cafile="./ca-cert", # CA used to sign certificate.
# `CARoot` of JKS store container
certfile="./cert-signed", # Signed certificate
keyfile="./cert-key", # Private Key file of `certfile` certificate
password="123123"
)
async def produce_and_consume():
# Produce
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9093',
security_protocol="SSL", ssl_context=context)
await producer.start()
try:
msg = await producer.send_and_wait(
'my_topic', b"Super Message", partition=0)
finally:
await producer.stop()
consumer = AIOKafkaConsumer(
"my_topic", bootstrap_servers='localhost:9093',
security_protocol="SSL", ssl_context=context)
await consumer.start()
try:
consumer.seek(TopicPartition('my_topic', 0), msg.offset)
fetch_msg = await consumer.getone()
finally:
await consumer.stop()
print("Success", msg, fetch_msg)
if __name__ == "__main__":
asyncio.run(produce_and_consume())