Skip to content

Commit

Permalink
Merge pull request dsully#11 from sholsapp/add-max-message-size
Browse files Browse the repository at this point in the history
Allow clients to enforce a maximum produce message size. This allows pro...
  • Loading branch information
Dan Sully committed Jan 10, 2013
2 parents 46ba940 + 1aa1e34 commit 0d5af6b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 44 deletions.
111 changes: 70 additions & 41 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,61 +7,91 @@
import kafka.io
import kafka.request_type


class Producer(kafka.io.IO):
"""Class for sending data to a `Kafka <http://sna-projects.com/kafka/>`_ broker."""
"""Class for sending data to a `Kafka <http://sna-projects.com/kafka/>`_ broker.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
:param max_message_sz: The maximum allowed size of a produce request (in bytes). [default: 1MB]
def __init__(self, topic, partition=0, host='localhost', port=9092):
"""Setup a kafka producer.
"""

:param topic: The topic to produce to.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

"""
def __init__(self, topic, partition=0, host='localhost', port=9092, max_message_sz=1048576):
kafka.io.IO.__init__(self, host, port)
self.max_message_sz = max_message_sz
self.topic = topic
self.partition = partition
self.connect()

def encode_request(self, messages):
"""Encode a sequence of messages for sending to the broker.
def _pack_payload(self, messages):
"""Pack a list of messages into a sendable buffer.
:param messages: A sequence of :class:`Message <kafka.message>` objects.
:param msgs: The packed messages to send.
:param size: The size (in bytes) of all the `messages` to send.
"""
# encode messages as::
# <LEN: int>
# <MESSAGE_BYTES>
encoded = [message.encode() for message in messages]
lengths = [len(em) for em in encoded]

# Build up the struct format.
mformat = '>' + ''.join(['i%ds' % l for l in lengths])

# Flatten the two lists to match the format.
message_set = struct.pack(mformat, *list(itertools.chain.from_iterable(zip(lengths, encoded))))

payload = ''.join(messages)
payload_sz = len(payload)
topic_sz = len(self.topic)
# Create the request as::
# <REQUEST_SIZE: int>
# <REQUEST_ID: short>
# <TOPIC_SIZE: short>
# <TOPIC: bytes>
# <PARTITION: int>
# <BUFFER_SIZE: int>
# <BUFFER: bytes>
topic_len = len(self.topic)
mset_len = len(message_set)
pformat = '>HH%dsii%ds' % (topic_len, mset_len)
payload = struct.pack(pformat, self.PRODUCE_REQUEST_ID, topic_len, self.topic, self.partition, mset_len, message_set)
return struct.pack(
'>HH%dsii%ds' % (topic_sz, payload_sz),
self.PRODUCE_REQUEST_ID,
topic_sz,
self.topic,
self.partition,
payload_sz,
payload
)

def _pack_kafka_message(self, payload):
"""Pack a payload in a format kafka expects."""
return struct.pack('>i%ds' % len(payload), len(payload), payload)

def encode_request(self, messages):
"""Encode a sequence of messages for sending to a kafka broker.
Encoding a request can yeild multiple kafka messages if the payloads exceed
the maximum produce size.
:param messages: An iterable of :class:`Message <kafka.message>` objecjts.
:rtype: A generator of packed kafka messages ready for sending.
"""
encoded_msgs = []
encoded_msgs_sz = 0
for message in messages:
encoded = message.encode()
encoded_sz = len(encoded)
if encoded_sz + encoded_msgs_sz > self.max_message_sz:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))
encoded_msgs = []
encoded_msgs_sz = 0
msg = struct.pack('>i%ds' % encoded_sz, encoded_sz, encoded)
encoded_msgs.append(msg)
encoded_msgs_sz += encoded_sz
if encoded_msgs:
yield self._pack_kafka_message(self._pack_payload(encoded_msgs))

def send(self, messages):
"""Send a :class:`Message <kafka.message>` or a sequence of `Messages` to the Kafka server."""
if isinstance(messages, kafka.message.Message):
messages = [messages]
return self.write(self.encode_request(messages))
for message in self.encode_request(messages):
sent = self.write(message)
if sent != len(message):
raise IOError('Failed to send kafka message - sent %s/%s many bytes.' % (sent, len(message)))

@contextlib.contextmanager
def batch(self):
Expand All @@ -72,29 +102,28 @@ def batch(self):


class BatchProducer(Producer):
"""Class for batching messages to a `Kafka <http://sna-projects.com/kafka/>`_ broker with periodic flushing."""
"""Class for batching messages to a `Kafka <http://sna-projects.com/kafka/>`_ broker with periodic flushing.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE
:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
def __init__(self, topic, batch_interval, partition=0, host='localhost', port=9092):
"""Setup a batch kafka producer.
"""

:param topic: The topic to produce to.
:param batch_interval: The amount of time (in seconds) to wait for messages before sending.
:param partition: The broker partition to produce to.
:param host: The kafka host.
:param port: The kafka port.
PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE

"""
def __init__(self, topic, batch_interval, partition=0, host='localhost', port=9092):
Producer.__init__(self, topic, partition=partition, host=host, port=port)
self.batch_interval = batch_interval
self._message_queue = []
self.event = threading.Event()
self.lock = threading.Lock()
self.timer = threading.Thread(target=self._interval_timer)
self.timer.daemon = True
self.connect()
self.timer.start()
self.connect()
atexit.register(self.close)

def _interval_timer(self):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

setuptools.setup(
name = 'pykafka',
version = '0.1.2',
version = '0.1.3',
license = 'MIT',
long_description = __doc__,
author = "Dan Sully",
Expand Down
4 changes: 2 additions & 2 deletions tests/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_producer(self, fake_connect, fake_write):
expected_msg = ('\x00\x00\x00$\x00\x00\x00\tFakeTopic\x00\x00\x00\x00'
'\x00\x00\x00\x0f\x00\x00\x00\x0b\x00\xcf\x02\xbb\\abc123')
fake_socket = fake_connect.expects_call().with_args(('localhost', 1234)).returns_fake()
fake_write.expects_call().with_args(expected_msg)
fake_write.expects_call().with_args(expected_msg).returns(len(expected_msg))
p = producer.Producer('FakeTopic', host='localhost', port=1234)
p.send(message.Message('abc123'))

Expand All @@ -35,7 +35,7 @@ def test_batch_producer(self, fake_connect, fake_write):
expected_msg_two = ('\x00\x00\x00$\x00\x00\x00\tFakeTopic\x00\x00\x00\x00\x00'
'\x00\x00\x0f\x00\x00\x00\x0b\x00\xf0\xb69\xb8jkl123')
fake_socket = fake_connect.expects_call().with_args(('localhost', 1234)).returns_fake()
fake_write.expects_call().with_args(expected_msg_one).next_call().with_args(expected_msg_two)
fake_write.expects_call().with_args(expected_msg_one).returns(len(expected_msg_one)).next_call().with_args(expected_msg_two).returns(len(expected_msg_two))
p = producer.BatchProducer('FakeTopic', batch_interval, host='localhost', port=1234)
p.enqueue(message.Message('abc123'))
p.enqueue(message.Message('def456'))
Expand Down

0 comments on commit 0d5af6b

Please sign in to comment.