Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add listener support for consumer #56

Merged
merged 1 commit into from
Jan 7, 2020

Conversation

gperinazzo
Copy link
Contributor

Adds support for a listener function to the Consumer, using the C++ client listener support.

Using Receive currently will block a thread from the Node worker poll. If enough consumers are waiting to receive messages from pulsar at the same time, this will completely clog the worker poll.

Since #14, node-addon-api has added support for ThreadSafeFunction's, allowing callbacks to be made from native threads. This pull request updates the node-addon-api dependency version to use this, but that will increase the minimum supported node version.

This paves the way to use the async functions from the pulsar C++ client properly.

@gperinazzo gperinazzo force-pushed the consumer-listener branch 2 times, most recently from e060a87 to 7951b2d Compare October 3, 2019 03:59
@gperinazzo
Copy link
Contributor Author

It seems that the new ThreadSafeFunction requires N-API 4 or higher, which isn't available for Node 10.x unless we enable experimental features.

I'm also looking for a better way to route the pulsar_consumer_t to the callback function, as we can't save the one that the library gives us.

@hrsakai
Copy link
Contributor

hrsakai commented Oct 29, 2019

@gperinazzo
Sorry for the late reply and thanks for a PR.
I will review this PR.

@hrsakai
Copy link
Contributor

hrsakai commented Oct 29, 2019

retest this please

@hrsakai
Copy link
Contributor

hrsakai commented Oct 30, 2019

I can receive only one message when ack(message) doesn’t be called.

// Execute consumer
$ node ./consumer_listener.js
results.length: 1

// Produce messages on another terminal
$ pulsar-client produce -n 100 -m Hello persistent://public/ns/topic1

consumer_listener.js

.
.
  const consumer = await client.subscribe({
    topic: 'persistent://public/ns/topic1',
    subscription: 'sub1',
    listener: (message, ack) => {
      const data = message.getData().toString();
      results.push(data);
      console.log(`results.length: ${results.length}`);
//      ack(message);
      if (results.length === 10000) finish();
    },
  });
.
.

@gperinazzo
Copy link
Contributor Author

I can receive only one message when ack(message) doesn’t be called.

Yes, that is a problem I had to put in since the pulsar client expects the message listener to be synchronous. The consumer pointer it passes into the C callback is only valid until the function returns, so I can't allow the ack function to hold a reference to it after it ends.

Currently, I'll wait until either the ack function gets garbage collected or ack is called. So you do eventually get all messages, it's just that if you don't call it, you'll have to wait until all ack is garbage collected for every message (and that is pretty slow).

I checked how the Go library deals with this and it seems that it just leaks a bit of memory in order to pass the original consumer pointer into the message listener. I don't know if that would be acceptable. I added a third argument recently due to this, it just releases the message listener callback without acking the message in order to implement a timeout.

Any help in solving this issue without that downside is welcome. One option is to try to pass the original consumer pointer into the callback, but I could not find a way to free the memory after.

retest this please

I'll update the test dockerfile to install node 12 over the weekend. That should clear the CI issues.

@hrsakai
Copy link
Contributor

hrsakai commented Oct 30, 2019

I'll update the test dockerfile to install node 12 over the weekend. That should clear the CI issues.

The docker image used in CI was upgraded 5 days ago, and the nodejs version was also upgraded from v10.14.1 to v10.17.0.
Since nodejs v10.17.0 supports N-API v4, there is no need to update the test dockerfile.
https://hub.docker.com/layers/apachepulsar/pulsar-build/ubuntu-16.04/images/sha256-d03f66e312923a5e84936d92cfa0af6c95977e5cd4ecd4edb42aa808d543eeab

According to nodejs/node#28643 (comment), it seems that the thread-safe function is already stable in N-API v4, so there is no need to upgrade nodejs to v12.

The thread-safe function was marked as stable when N-API version 4 was released. 
At that time, all thread-safe function N-APIs were marked as stable, however, the overall heading (addressed in this PR) was forgotten.

@gperinazzo
Copy link
Contributor Author

gperinazzo commented Oct 30, 2019

The docker image used in CI was upgraded 5 days ago, and the nodejs version was also upgraded from v10.14.1 to v10.17.0.

Didn't see that, the last test passed.

@gperinazzo
Copy link
Contributor Author

I found a way to pass the original consumer pointer into the message listener, so it doesn't have to wait for the ack to be called. This makes your code snippet work as expected.

However, while doing so, I hit the problem described by nodejs/node-addon-api#592 that lead to a crash after a ThreadSafeFunction was released. I tested with the current master for node-addon-api and it works without any issues.

This should be blocked until the next release of node-addon-api, but if possible could you have a look at the code and give any feedback you can?
(CI is failing to download node-addon-api from github. It should work once they release the next version and we point to the new release)

@hrsakai
Copy link
Contributor

hrsakai commented Nov 7, 2019

I see.
I will check for new modifications.

@hrsakai
Copy link
Contributor

hrsakai commented Nov 12, 2019

Please tell me, why do you not pass consumer to listener like other languages?

example:

  const consumer = await client.subscribe({
    topic: 'persistent://public/any/topic1',
    subscription: 'sub1',
    listener: (message, consumer) => {
      consumer.acknowledge(message)
    },
  });

@gperinazzo
Copy link
Contributor Author

The code before used the consumer pointer given to me by the message listener callback and I couldn't call free on that, so I avoided creating a consumer from it (as the consumer will free the pointer). But now I can create one from the shared ConsumerWrapper and it won't free the consumer pointer.

I'll change it to pass a consumer.

@gperinazzo
Copy link
Contributor Author

@hrsakai The 2.0.0 version of node-addon-api was release, and the callback now passes the consumer instead of an ack function!

Could you please check this again? Any suggestions on how to organize the code is welcome, I had to place a few things inside the ConsumerConfiguration because the listener must be created with the configuration, but I'm really not happy about it.

@gperinazzo gperinazzo force-pushed the consumer-listener branch 2 times, most recently from 783909f to de32b76 Compare December 19, 2019 20:26
Copy link
Contributor

@hrsakai hrsakai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gperinazzo
LGTM
Thanks for your contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants