-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrailnova_kafka_mtls.py
128 lines (111 loc) · 4.74 KB
/
railnova_kafka_mtls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import argparse
import logging
import os
import sys
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
SSL_CA_LOCATION = os.path.abspath(os.path.join(os.path.dirname(__file__), "ca.pem"))
class Arguments(argparse.Namespace):
username: str
password: str
cert_path: str
key_path: str
topic: str
hostname: str
group_id: str
def arguments() -> Arguments:
parser = argparse.ArgumentParser(description="Railnova Kafka Avro consumer example with mTLS")
parser.add_argument(
"--username", dest="username", required=True, help="SASL username"
)
parser.add_argument(
"--password", dest="password", required=True, help="SASL password"
)
parser.add_argument(
"--certificate", dest="cert_path", required=True, help="Path to the Access Certificate file"
)
parser.add_argument(
"--key", dest="key_path", required=True, help="Path to the Access Key file"
)
parser.add_argument("--topic", dest="topic", required=True, help="Kafka topic name")
parser.add_argument(
"--hostname",
dest="hostname",
default="kafka-13e7abdf-test-railnova-5ffc.aivencloud.com",
help="Kafka broker hostname, defaults to Railnova's test broker",
)
parser.add_argument(
"--group-id",
dest="group_id",
default="railnova_kafka_example",
help="Kafka consumer group id",
)
return parser.parse_args()
def main() -> int:
# Parse command line arguments and configure a logger
args = arguments()
logHandler = logging.StreamHandler()
logHandler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger = logging.getLogger("railnova_kafka_example")
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
# Create a deserializer and serialization contexts to decode message keys and values from AVRO.
schema_registry = SchemaRegistryClient(
{"url": f"https://{args.username}:{args.password}@{args.hostname}:27249"}
)
key_context = SerializationContext(args.topic, MessageField.KEY)
value_context = SerializationContext(args.topic, MessageField.VALUE)
avro_deserializer = AvroDeserializer(schema_registry)
#
# See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#avrodeserializer
# Test connectivity to the schema registry by fetching schema with id 1.
schema_registry.get_schema(1)
logger.info(f"Schema registry is accessible at '{args.hostname}'")
# Create a Kafka consumer with a sensible configuration a for a single consumer.
kafka_consumer = Consumer(
{
"security.protocol": "SSL",
"ssl.ca.location": SSL_CA_LOCATION,
"ssl.certificate.location": args.cert_path,
"ssl.key.location": args.key_path,
"bootstrap.servers": f"{args.hostname}:27246",
"message.max.bytes": 5000000,
"group.id": args.group_id,
"enable.auto.commit": True, # commit the offset automatically.
"auto.offset.reset": "earliest", # start from the beginning of the topic (for this consumer group).
"partition.assignment.strategy": "roundrobin", # consume from all partitions in a round-robin fashion.
},
logger=logger,
)
# see https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
#
# Subscribe to the given topic
kafka_consumer.subscribe([args.topic])
#
logger.info(f"Kafka consumer subscribed to topic '{args.topic}'")
# poll for messages until one is consumed or a keyboard SIGINT is received ...
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
message = kafka_consumer.poll(1.0)
if message is None:
continue
# Check for Kafka errors, as Confluent bundle them as messages.
error: KafkaError | None = message.error()
if error is None:
# log the deserialized message's key and value
k = avro_deserializer(message.key(), key_context)
v = avro_deserializer(message.value(), value_context)
logger.info(f"Received {k} -> {v}")
break
else:
# Print the error message found in the message's value
logger.error(bytes.decode(message.value()))
except KeyboardInterrupt:
break
kafka_consumer.close()
return 0
if __name__ == "__main__":
sys.exit(main())