Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message creation, encoding, and decoding performance #1210

Closed
shargan opened this issue Sep 11, 2017 · 6 comments
Closed

Message creation, encoding, and decoding performance #1210

shargan opened this issue Sep 11, 2017 · 6 comments

Comments

@shargan
Copy link

shargan commented Sep 11, 2017

I'm working on a fairly-high-throughput Python project (150-200k msg/s to start) that uses the kafka-python protocol via aiokafka. While profiling to determine where our bottlenecks are, Message instantiation, encoding, and decoding stand out as areas ripe for improvement.

This is an area I'd like to contribute, but the design of kafka.protocol seems more oriented towards being a readable reference implementation than a high-performance implementation. Are updates in this area desirable even if they lead to superficially uglier code? Likewise, how important is backwards compatibility to you?

@shargan
Copy link
Author

shargan commented Sep 11, 2017

To illustrate a backwards-incompatible change that offers significant performance gain, consider this simple microbenchmark:

from timeit import timeit
from kafka.protocol.message import Message

SHORT_KEY = b"KEY"
SHORT_VALUE = b"VALUE"
LONG_KEY = SHORT_KEY * 1000
LONG_VALUE = SHORT_VALUE * 1000
TS = 1505171828210

def create(size):
    if size == "short":
        key = SHORT_KEY
        value = SHORT_VALUE
    elif size == "long":
        key = LONG_KEY
        value = LONG_VALUE
    return Message(value, key, magic=1, timestamp=TS)

def encode(size):
    m = short_created if size == "short" else long_created
    return m.encode()

def decode(size):
    encoded = short_encoded if size == "short" else long_encoded
    return Message.decode(encoded)

short_created = create("short")
long_created = create("long")
short_encoded = encode("short")
long_encoded = encode("long")

count = 1000000
tmpl = "%-25s%-8.2f%.0f"
print("%-25s%-8s%s" % ("Test", "Time", "Msg/s"))
for size in ("short", "long"):
    setup = "from __main__ import create, encode, decode, size"
    create_time = timeit("create(size)", setup=setup, number=count)
    print(tmpl % ("Create %s" % size, create_time, count / create_time))
    encode_time = timeit("encode(size)", setup=setup, number=count)
    print(tmpl % ("Encode %s" % size, encode_time, count / encode_time))
    decode_time = timeit("decode(size)", setup=setup, number=count)
    print(tmpl % ("Decode %s" % size, decode_time, count / decode_time))

The baseline on my development host looks like this:

$ python bench.py
Test                     Time    Msg/s
Create short             3.35    298683
Encode short             8.37    119542
Decode short             11.59   86265
Create long              3.50    285579
Encode long              11.68   85591
Decode long              13.88   72036

Simply removing the WeakMethod assignment within Message.__init__ and calling Message._encode_self() directly leads to a significant performance gain during Message instantiation (so both Creation and Decoding) and a modest but noticeable performance gain during encoding:

$ git diff
diff --git a/bench.py b/bench.py
index 35edcb4..f2dd131 100755
--- a/bench.py
+++ b/bench.py
@@ -21,7 +21,7 @@ def create(size):

 def encode(size):
     m = short_created if size == "short" else long_created
-    return m.encode()
+    return m._encode_self()

 def decode(size):
     encoded = short_encoded if size == "short" else long_encoded
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 70d5b36..6d456e7 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -54,7 +54,6 @@ class Message(Struct):
         self.attributes = attributes
         self.key = key
         self.value = value
-        self.encode = WeakMethod(self._encode_self)

     @property
     def timestamp_type(self):

$ python bench.py
Test                     Time    Msg/s
Create short             1.76    569588
Encode short             7.19    139128
Decode short             9.36    106813
Create long              1.76    568607
Encode long              10.62   94206
Decode long              11.58   86337

Naturally, making this change requires switching up how one (or both) of Message.encode() and message.encode() are called.

Is this sort of change something you'd be open to?

@dpkp
Copy link
Owner

dpkp commented Sep 12, 2017 via email

@tvoinarovskyi
Copy link
Collaborator

@dpkp I am interested in that area too, but the #1185 changes encode/decode very much, so enhancements should probably follow after (and in both version of records).

@shargan
Copy link
Author

shargan commented Sep 12, 2017

Gotcha. I'll postpone this work until #1185 is complete, then.

@dpkp
Copy link
Owner

dpkp commented Jan 12, 2018

Note that #1185 is now landed so it should be safe to work on this now

@dpkp
Copy link
Owner

dpkp commented Dec 29, 2019

kafka.protocol.message is no longer used (replaced by kafka.records.*), so I'm going to close this.

@dpkp dpkp closed this as completed Dec 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants