-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add listener support for consumer #56
Conversation
e060a87
to
7951b2d
Compare
It seems that the new I'm also looking for a better way to route the |
@gperinazzo |
retest this please |
I can receive only one message when
consumer_listener.js
|
Yes, that is a problem I had to put in since the pulsar client expects the message listener to be synchronous. The consumer pointer it passes into the C callback is only valid until the function returns, so I can't allow the ack function to hold a reference to it after it ends. Currently, I'll wait until either the ack function gets garbage collected or ack is called. So you do eventually get all messages, it's just that if you don't call it, you'll have to wait until all ack is garbage collected for every message (and that is pretty slow). I checked how the Go library deals with this and it seems that it just leaks a bit of memory in order to pass the original consumer pointer into the message listener. I don't know if that would be acceptable. I added a third argument recently due to this, it just releases the message listener callback without acking the message in order to implement a timeout. Any help in solving this issue without that downside is welcome. One option is to try to pass the original consumer pointer into the callback, but I could not find a way to free the memory after.
I'll update the test dockerfile to install node 12 over the weekend. That should clear the CI issues. |
The docker image used in CI was upgraded 5 days ago, and the nodejs version was also upgraded from According to nodejs/node#28643 (comment), it seems that the thread-safe function is already stable in N-API v4, so there is no need to upgrade nodejs to v12.
|
Didn't see that, the last test passed. |
I found a way to pass the original consumer pointer into the message listener, so it doesn't have to wait for the ack to be called. This makes your code snippet work as expected. However, while doing so, I hit the problem described by nodejs/node-addon-api#592 that lead to a crash after a ThreadSafeFunction was released. I tested with the current master for node-addon-api and it works without any issues. This should be blocked until the next release of node-addon-api, but if possible could you have a look at the code and give any feedback you can? |
I see. |
Please tell me, why do you not pass consumer to listener like other languages? example:
|
The code before used the consumer pointer given to me by the message listener callback and I couldn't call free on that, so I avoided creating a consumer from it (as the consumer will free the pointer). But now I can create one from the shared I'll change it to pass a consumer. |
30d2267
to
5a04aeb
Compare
@hrsakai The 2.0.0 version of node-addon-api was release, and the callback now passes the consumer instead of an ack function! Could you please check this again? Any suggestions on how to organize the code is welcome, I had to place a few things inside the ConsumerConfiguration because the listener must be created with the configuration, but I'm really not happy about it. |
783909f
to
de32b76
Compare
de32b76
to
dbda873
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gperinazzo
LGTM
Thanks for your contribution.
Adds support for a listener function to the Consumer, using the C++ client listener support.
Using
Receive
currently will block a thread from the Node worker poll. If enough consumers are waiting to receive messages from pulsar at the same time, this will completely clog the worker poll.Since #14,
node-addon-api
has added support forThreadSafeFunction
's, allowing callbacks to be made from native threads. This pull request updates thenode-addon-api
dependency version to use this, but that will increase the minimum supported node version.This paves the way to use the async functions from the pulsar C++ client properly.