The puavro
is a small convenience library enabling usage of the Apache Pulsar Python client
with pre-defined AVRO schemas and
Python dictionaries instead of AVRO schemas declared as records.
In other words, the library provides an interface to the standard Apache Pulsar client allowing to read/write AVRO messages from/to Python dictionary using AVRO schema, either:
- declared as Python dictionary (using
fastavro.schema.parse_schema()
) or - loaded from .avsc file (using
fastavro.schema.load_schema()
) or - parsed from JSON string (using
fastavro.schema.parse_schema(json.loads())
).
The puavro
library consists of just two classes:
DictAVRO
derived from Pythondict
and designated to be used instead ofpulsar.schema.Record
class;DictAvroSchema
derived frompulsar.schema.AvroSchema
and designated to be used instead.
See also:
To enable usage of Python Pulsar client with AVRO messages generated / received by modules written in other languages and using external AVRO schemas (e.g. stored in .avsc files).
puavro
is available on PyPi:
pip install puavro
The library depends on the following modules:
fastavro>=1.4.4
pulsar-client>=2.7.0
The library has been run and tested against Pulsar Python client v. 2.7.0 and 2.8.0. and fastavro v. 1.4.4. and is expected to be compatible with all higher versions also.
The library is provided under terms of the MIT license.
The samples in this sections assume the following imports:
import pulsar
import fastavro
import puavro
import json
import datetime
from pprint import pp
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.load_schema("Segment.avsc")
or
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.parse_schema(json.loads("""{
"type" : "record",
"name" : "Segment",
"namespace" : "try",
"fields" : [ {
"name" : "id",
"type" : "long"
}, {
"name" : "name",
"type" : "string"
}, {
"name" : "when",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "direction",
"type" : {
"type" : "enum",
"name" : "CardinalDirection",
"symbols" : [ "north", "south", "east", "west" ]
}
}, {
"name" : "length",
"type" : [ "null", "long" ]
} ]
}
"""))
or
class Segment(puavro.DictAVRO):
SCHEMA = fastavro.schema.parse_schema({
"type" : "record",
"name" : "Segment",
"namespace" : "try",
"fields" : [ {
"name" : "id",
"type" : "long"
}, {
"name" : "name",
"type" : "string"
}, {
"name" : "when",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "direction",
"type" : {
"type" : "enum",
"name" : "CardinalDirection",
"symbols" : [ "north", "south", "east", "west" ]
}
}, {
"name" : "length",
"type" : [ "null", "long" ]
} ]
})
or
class Segment(puavro.DictAVRO):
pass
Segment.set_schema(fastavro.schema.load_schema("segment.avsc"))
Using class Segment
(derived from puavro.DictAVRO
above) and puavro.DictAvroSchema
class (instead of pulsar.schema.AvroSchema
):
PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
producer = pulsar_client.create_producer(topic=TOPIC,
schema=puavro.DictAvroSchema(Segment))
try:
segment = Segment(
id=99,
name = "some name",
when = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc),
direction = "north",
length = 12345,
)
producer.send(segment)
finally:
producer.close()
pulsar_client.close()
Using class Segment
(derived from puavro.DictAVRO
above) and puavro.DictAvroSchema
class (instead of pulsar.schema.AvroSchema
):
PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TOPIC = "try"
WAIT_SECONDS = 3
pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
consumer = pulsar_client.subscribe(TOPIC,
subscription_name="sample",
consumer_type=pulsar.ConsumerType.Shared,
schema=puavro.DictAvroSchema(Segment))
try:
while True:
msg = consumer.receive(WAIT_SECONDS * 1000)
segment = msg.value()
pp(segment)
consumer.acknowledge(msg)
except Exception as e:
if str(e) == 'Pulsar error: TimeOut':
print("END OF DATA")
else:
raise
finally:
consumer.close()
pulsar_client.close()
The complete samples can be found in the samples directory:
- Producer: sender.py
- Consumer: receiver.py
- AVRO schema: Segment.avsc
- AVRO IDL: try.avdl