Skip to content
This repository has been archived by the owner on Jul 20, 2019. It is now read-only.

Allow clients to enforce a maximum produce message size. This allows pro... #11

Merged
merged 1 commit into from
Jan 10, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 70 additions & 41 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,61 +7,91 @@
import kafka.io
import kafka.request_type


class Producer(kafka.io.IO):
"""Class for sending data to a `Kafka <http://sna-projects.com/kafka/>`_ broker."""
"""Class for sending data to a `Kafka <http://sna-projects.com/kafka/>`_ broker.

PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
:param max_message_sz: The maximum allowed size of a produce request (in bytes). [default: 1MB]

def __init__(self, topic, partition=0, host='localhost', port=9092):
"""Setup a kafka producer.
"""

:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

"""
def __init__(self, topic, partition=0, host='localhost', port=9092, max_message_sz=1048576):
kafka.io.IO.__init__(self, host, port)
self.max_message_sz = max_message_sz
self.topic = topic
self.partition = partition
self.connect()

def encode_request(self, messages):
"""Encode a sequence of messages for sending to the broker.
def _pack_payload(self, messages):
"""Pack a list of messages into a sendable buffer.

:param messages: A sequence of :class:`Message <kafka.message>` objects.
:param msgs: The packed messages to send.
:param size: The size (in bytes) of all the `messages` to send.

"""
# encode messages as::
# <LEN: int>
# <MESSAGE_BYTES>
encoded = [message.encode() for message in messages]
lengths = [len(em) for em in encoded]

# Build up the struct format.
mformat = '>' + ''.join(['i%ds' % l for l in lengths])

# Flatten the two lists to match the format.
message_set = struct.pack(mformat, *list(itertools.chain.from_iterable(zip(lengths, encoded))))

payload = ''.join(messages)
payload_sz = len(payload)
topic_sz = len(self.topic)
# Create the request as::
# <REQUEST_SIZE: int>
# <REQUEST_ID: short>
# <TOPIC_SIZE: short>
# <TOPIC: bytes>
# <PARTITION: int>
# <BUFFER_SIZE: int>
# <BUFFER: bytes>
topic_len = len(self.topic)
mset_len = len(message_set)
pformat = '>HH%dsii%ds' % (topic_len, mset_len)
payload = struct.pack(pformat, self.PRODUCE_REQUEST_ID, topic_len, self.topic, self.partition, mset_len, message_set)
return struct.pack(
'>HH%dsii%ds' % (topic_sz, payload_sz),
self.PRODUCE_REQUEST_ID,
topic_sz,
self.topic,
self.partition,
payload_sz,
payload
)

def _pack_kafka_message(self, payload):
"""Pack a payload in a format kafka expects."""
return struct.pack('>i%ds' % len(payload), len(payload), payload)

def encode_request(self, messages):
"""Encode a sequence of messages for sending to a kafka broker.

Encoding a request can yeild multiple kafka messages if the payloads exceed
the maximum produce size.

:param messages: An iterable of :class:`Message <kafka.message>` objecjts.
:rtype: A generator of packed kafka messages ready for sending.

"""
encoded_msgs = []
encoded_msgs_sz = 0
for message in messages:
encoded = message.encode()
encoded_sz = len(encoded)
if encoded_sz + encoded_msgs_sz > self.max_message_sz:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))
encoded_msgs = []
encoded_msgs_sz = 0
msg = struct.pack('>i%ds' % encoded_sz, encoded_sz, encoded)
encoded_msgs.append(msg)
encoded_msgs_sz += encoded_sz
if encoded_msgs:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))

def send(self, messages):
"""Send a :class:`Message <kafka.message>` or a sequence of `Messages` to the Kafka server."""
if isinstance(messages, kafka.message.Message):
messages = [messages]
return self.write(self.encode_request(messages))
for message in self.encode_request(messages):
sent = self.write(message)
if sent != len(message):
raise IOError('Failed to send kafka message - sent %s/%s many bytes.' % (sent, len(message)))

@contextlib.contextmanager
def batch(self):
Expand All @@ -72,29 +102,28 @@ def batch(self):


class BatchProducer(Producer):
"""Class for batching messages to a `Kafka <http://sna-projects.com/kafka/>`_ broker with periodic flushing."""
"""Class for batching messages to a `Kafka <http://sna-projects.com/kafka/>`_ broker with periodic flushing.

PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.

def __init__(self, topic, batch_interval, partition=0, host='localhost', port=9092):
"""Setup a batch kafka producer.
"""

:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

"""
def __init__(self, topic, batch_interval, partition=0, host='localhost', port=9092):
Producer.__init__(self, topic, partition=partition, host=host, port=port)
self.batch_interval = batch_interval
self._message_queue = []
self.event = threading.Event()
self.lock = threading.Lock()
self.timer = threading.Thread(target=self._interval_timer)
self.timer.daemon = True
self.connect()
self.timer.start()
self.connect()
atexit.register(self.close)

def _interval_timer(self):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

setuptools.setup(
name = 'pykafka',
version = '0.1.2',
version = '0.1.3',
license = 'MIT',
long_description = __doc__,
author = "Dan Sully",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_producer(self, fake_connect, fake_write):
expected_msg = ('\x00\x00\x00$\x00\x00\x00\tFakeTopic\x00\x00\x00\x00'
'\x00\x00\x00\x0f\x00\x00\x00\x0b\x00\xcf\x02\xbb\\abc123')
fake_socket = fake_connect.expects_call().with_args(('localhost', 1234)).returns_fake()
fake_write.expects_call().with_args(expected_msg)
fake_write.expects_call().with_args(expected_msg).returns(len(expected_msg))
p = producer.Producer('FakeTopic', host='localhost', port=1234)
p.send(message.Message('abc123'))

Expand All @@ -35,7 +35,7 @@ def test_batch_producer(self, fake_connect, fake_write):
expected_msg_two = ('\x00\x00\x00$\x00\x00\x00\tFakeTopic\x00\x00\x00\x00\x00'
'\x00\x00\x0f\x00\x00\x00\x0b\x00\xf0\xb69\xb8jkl123')
fake_socket = fake_connect.expects_call().with_args(('localhost', 1234)).returns_fake()
fake_write.expects_call().with_args(expected_msg_one).next_call().with_args(expected_msg_two)
fake_write.expects_call().with_args(expected_msg_one).returns(len(expected_msg_one)).next_call().with_args(expected_msg_two).returns(len(expected_msg_two))
p = producer.BatchProducer('FakeTopic', batch_interval, host='localhost', port=1234)
p.enqueue(message.Message('abc123'))
p.enqueue(message.Message('def456'))
Expand Down