Skip to content

Commit

Permalink
Allow topic separator to be redefined and fix publish calls for py3
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha committed Mar 29, 2022
1 parent 678522b commit 7bd075e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
2 changes: 2 additions & 0 deletions txzmq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ZmqConnection(object):
:vartype multicastRate: int
:var highWaterMark: hard limit on the maximum number of outstanding
messages 0MQ shall queue in memory for any single peer
:var topicSep: bytes
:vartype highWaterMark: int
:var tcpKeepalive: if set to 1, enable TCP keepalive, otherwise leave
it as default
Expand Down Expand Up @@ -99,6 +100,7 @@ class ZmqConnection(object):
allowLoopbackMulticast = False
multicastRate = 100
highWaterMark = 0
topicSep = b'\0'

# Only supported by zeromq3 and pyzmq>=2.2.0.1
tcpKeepalive = 0
Expand Down
12 changes: 8 additions & 4 deletions txzmq/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def publish(self, message, tag=b''):
:param tag: message tag
:type tag: str
"""
self.send(tag + b'\0' + message)
if isinstance(tag, str):
tag = tag.encode()
if isinstance(message, str):
message = message.encode()
self.send(tag + self.topicSep + message)


class ZmqSubConnection(ZmqConnection):
Expand All @@ -44,7 +48,7 @@ def subscribe(self, tag):
:param tag: message tag
:type tag: str
"""
self.socket.set(constants.SUBSCRIBE, tag)
self.socket.setsockopt(constants.SUBSCRIBE, tag)

def unsubscribe(self, tag):
"""
Expand All @@ -55,7 +59,7 @@ def unsubscribe(self, tag):
:param tag: message tag
:type tag: str
"""
self.socket.set(constants.UNSUBSCRIBE, tag)
self.socket.setsockopt(constants.UNSUBSCRIBE, tag)

def messageReceived(self, message):
"""
Expand All @@ -71,7 +75,7 @@ def messageReceived(self, message):
# of multi-part message
self.gotMessage(message[1], message[0])
else:
self.gotMessage(*reversed(message[0].split(b'\0', 1)))
self.gotMessage(*reversed(message[0].split(self.topicSep, 1)))

def gotMessage(self, message, tag):
"""
Expand Down

0 comments on commit 7bd075e

Please sign in to comment.