-
Notifications
You must be signed in to change notification settings - Fork 36.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ZMQ] append a message sequence number to every ZMQ notification #7762
Conversation
59c4c09
to
4120ceb
Compare
Concept ACK, I like adding the sequence nr as an extra part. Also ok with using 32 bit sequence numbers. |
I thought a subscriber can't miss a message. Edit: But it can: http://stackoverflow.com/a/15821036. So what can a subscriber do if it detects a missing message? |
@promag: |
The way we are using ZMQ, the sender will never block. This is sensible as we don't want to hang the application due to a slow client. But this means at some point one of the send queues will fill up and new messages will be discarded.
That's up to the application. Either stop with a fatal error, or re-start/re-sync. An event application usually does some synchronization at start-up, then starts processing incremental changes. After getting out of sync it can repeat the process. The important thing is being able to detect it and thus act on it. |
This needs mention in |
body = msg[1] | ||
nseq = msg[2] | ||
msgSequence = struct.unpack('<I', str(msg[-1]))[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for using str()
here?
4120ceb
to
86fbbad
Compare
c0cdcce
to
fefecf8
Compare
Fixed @MarcoFalke nit. |
@@ -47,11 +48,17 @@ def run_test(self): | |||
print "listen..." | |||
msg = self.zmqSubSocket.recv_multipart() | |||
topic = msg[0] | |||
assert_equal(topic, "hashtx") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Add the b
prefix, which is a noop in py2?
b"hashtx"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing!
fefecf8
to
50a825a
Compare
@MarcoFalke: fixed nits. |
// Internal function to send multipart message | ||
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) | ||
static int zmq_send_multipart_keepalive(void *sock, const void* data, size_t size, ...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the rename to _keepalive
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to _keepalive
because it does not "close" the multipart message. All messaged are sent with ZMQ_SNDMORE
and I wanted to increase the awareness of that by renaming the function.
Here's a slight simplification of the code - gets rid of some duplication, as well as the _keepalive change: |
50a825a
to
0b25a9f
Compare
…fication 0b25a9f [ZMQ] append a message sequence number to every ZMQ notification (Jonas Schnelli) de821d5 [ZMQ] refactor message string (Jonas Schnelli)
Could we make that parameterizable? |
I don't believe ZMQ ever guarantees reliability.
|
Well notifications can be lost, through zmq or otherwise, for example if bitcoind needs to be restarted, or a myriad of other circumstances not under your control. At least you can detect it now:
Your application needs an (application dependent) way to resync anyhow. This is better than pretending to guarantee something. |
Add ZeroMQ notifications Cherry-picked from the following upstream PRs: - bitcoin/bitcoin#6103 - bitcoin/bitcoin#6684 - bitcoin/bitcoin#6686 - bitcoin/bitcoin#6736 - bitcoin/bitcoin#6739 - bitcoin/bitcoin#6743 - bitcoin/bitcoin#6768 - bitcoin/bitcoin#6779 - bitcoin/bitcoin#6810 - bitcoin/bitcoin#6927 - bitcoin/bitcoin#6980 (only upgrading zeromq) - bitcoin/bitcoin#6680 - bitcoin/bitcoin#7058 - bitcoin/bitcoin#7621 - bitcoin/bitcoin#7335 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7853 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7762 - bitcoin/bitcoin#7993 (only upgrading zeromq) - bitcoin/bitcoin#8238 - bitcoin/bitcoin#8701 - bitcoin/bitcoin#6685 Closes #2020.
Add ZeroMQ notifications Cherry-picked from the following upstream PRs: - bitcoin/bitcoin#6103 - bitcoin/bitcoin#6684 - bitcoin/bitcoin#6686 - bitcoin/bitcoin#6736 - bitcoin/bitcoin#6739 - bitcoin/bitcoin#6743 - bitcoin/bitcoin#6768 - bitcoin/bitcoin#6779 - bitcoin/bitcoin#6810 - bitcoin/bitcoin#6927 - bitcoin/bitcoin#6980 (only upgrading zeromq) - bitcoin/bitcoin#6680 - bitcoin/bitcoin#7058 - bitcoin/bitcoin#7621 - bitcoin/bitcoin#7335 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7853 (only parts affecting `zmq_test.py`) - bitcoin/bitcoin#7762 - bitcoin/bitcoin#7993 (only upgrading zeromq) - bitcoin/bitcoin#8238 - bitcoin/bitcoin#8701 - bitcoin/bitcoin#6685 Closes #2020.
Currently, ZMQ listeners cannot detect if they have missed a message.
This PR adds a sequence number to each message (each message type has its own counter).
The sequence number is the last part of the multipart zmq message to not break the API, though, we could consider breaking the API in favor of moving the sequence number to the very beginning.
Todo: