-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message creation, encoding, and decoding performance #1210
Comments
To illustrate a backwards-incompatible change that offers significant performance gain, consider this simple microbenchmark: from timeit import timeit
from kafka.protocol.message import Message
SHORT_KEY = b"KEY"
SHORT_VALUE = b"VALUE"
LONG_KEY = SHORT_KEY * 1000
LONG_VALUE = SHORT_VALUE * 1000
TS = 1505171828210
def create(size):
if size == "short":
key = SHORT_KEY
value = SHORT_VALUE
elif size == "long":
key = LONG_KEY
value = LONG_VALUE
return Message(value, key, magic=1, timestamp=TS)
def encode(size):
m = short_created if size == "short" else long_created
return m.encode()
def decode(size):
encoded = short_encoded if size == "short" else long_encoded
return Message.decode(encoded)
short_created = create("short")
long_created = create("long")
short_encoded = encode("short")
long_encoded = encode("long")
count = 1000000
tmpl = "%-25s%-8.2f%.0f"
print("%-25s%-8s%s" % ("Test", "Time", "Msg/s"))
for size in ("short", "long"):
setup = "from __main__ import create, encode, decode, size"
create_time = timeit("create(size)", setup=setup, number=count)
print(tmpl % ("Create %s" % size, create_time, count / create_time))
encode_time = timeit("encode(size)", setup=setup, number=count)
print(tmpl % ("Encode %s" % size, encode_time, count / encode_time))
decode_time = timeit("decode(size)", setup=setup, number=count)
print(tmpl % ("Decode %s" % size, decode_time, count / decode_time)) The baseline on my development host looks like this:
Simply removing the WeakMethod assignment within
Naturally, making this change requires switching up how one (or both) of Message.encode() and message.encode() are called. Is this sort of change something you'd be open to? |
I'm very interested in performance improvements. Currently I mostly
recommend using pypy as it typically yields much higher throughput after
the JIT warms up.
…On Sep 11, 2017 4:39 PM, "shargan" ***@***.***> wrote:
To illustrate a backwards-incompatible change that offers significant
performance gain, consider this simple microbenchmark:
from timeit import timeitfrom kafka.protocol.message import Message
SHORT_KEY = b"KEY"SHORT_VALUE = b"VALUE"LONG_KEY = SHORT_KEY * 1000LONG_VALUE = SHORT_VALUE * 1000TS = 1505171828210
def create(size):
if size == "short":
key = SHORT_KEY
value = SHORT_VALUE
elif size == "long":
key = LONG_KEY
value = LONG_VALUE
return Message(value, key, magic=1, timestamp=TS)
def encode(size):
m = short_created if size == "short" else long_created
return m.encode()
def decode(size):
encoded = short_encoded if size == "short" else long_encoded
return Message.decode(encoded)
short_created = create("short")
long_created = create("long")
short_encoded = encode("short")
long_encoded = encode("long")
count = 1000000
tmpl = "%-25s%-8.2f%.0f"print("%-25s%-8s%s" % ("Test", "Time", "Msg/s"))for size in ("short", "long"):
setup = "from __main__ import create, encode, decode, size"
create_time = timeit("create(size)", setup=setup, number=count)
print(tmpl % ("Create %s" % size, create_time, count / create_time))
encode_time = timeit("encode(size)", setup=setup, number=count)
print(tmpl % ("Encode %s" % size, encode_time, count / encode_time))
decode_time = timeit("decode(size)", setup=setup, number=count)
print(tmpl % ("Decode %s" % size, decode_time, count / decode_time))
The baseline on my development host looks like this:
$ python bench.py
Test Time Msg/s
Create short 3.35 298683
Encode short 8.37 119542
Decode short 11.59 86265
Create long 3.50 285579
Encode long 11.68 85591
Decode long 13.88 72036
Simply removing the WeakMethod assignment within Message.*init* and
calling Message._encode_self() directly leads to a significant performance
gain during Message instantiation (so both Creation and Decoding) and a
modest but noticeable performance gain during encoding:
$ git diff
diff --git a/bench.py b/bench.py
index 35edcb4..f2dd131 100755
--- a/bench.py
+++ b/bench.py
@@ -21,7 +21,7 @@ def create(size):
def encode(size):
m = short_created if size == "short" else long_created
- return m.encode()
+ return m._encode_self()
def decode(size):
encoded = short_encoded if size == "short" else long_encoded
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 70d5b36..6d456e7 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -54,7 +54,6 @@ class Message(Struct):
self.attributes = attributes
self.key = key
self.value = value
- self.encode = WeakMethod(self._encode_self)
@Property
def timestamp_type(self):
$ python bench.py
Test Time Msg/s
Create short 1.76 569588
Encode short 7.19 139128
Decode short 9.36 106813
Create long 1.76 568607
Encode long 10.62 94206
Decode long 11.58 86337
Naturally, making this change requires switching up how one (or both) of
Message.encode() and message.encode() are called.
Is this sort of change something you'd be open to?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#1210 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAzetD_mOfwjFh9YbtRy_fwTrCkHkrrnks5shcSwgaJpZM4PT5q9>
.
|
Gotcha. I'll postpone this work until #1185 is complete, then. |
Note that #1185 is now landed so it should be safe to work on this now |
|
I'm working on a fairly-high-throughput Python project (150-200k msg/s to start) that uses the kafka-python protocol via aiokafka. While profiling to determine where our bottlenecks are, Message instantiation, encoding, and decoding stand out as areas ripe for improvement.
This is an area I'd like to contribute, but the design of kafka.protocol seems more oriented towards being a readable reference implementation than a high-performance implementation. Are updates in this area desirable even if they lead to superficially uglier code? Likewise, how important is backwards compatibility to you?
The text was updated successfully, but these errors were encountered: