Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listening for messages #62

Closed
mhaagens opened this issue Dec 16, 2019 · 16 comments
Closed

Listening for messages #62

mhaagens opened this issue Dec 16, 2019 · 16 comments

Comments

@mhaagens
Copy link

How can I listen to incoming messages? The listener property on consumer doesn't seem to do anything.

@david-streamlio
Copy link

@mhaagens In order to receive messages, you will want to use the subscribe method as shown in the following example: https://github.com/apache/pulsar-client-node/blob/master/examples/consumer.js

@mhaagens
Copy link
Author

Thanks for the quick reply! I am using subscribe, but as far I can see from the example you have to manually call receive on the subscriber in a loop to get messages?

@david-streamlio
Copy link

@mhaagens You are correct. The subscribe command is used to register your consumer on the Pulsar broker, so it will know that you want to receive the messages. After you had subscribed, you have to make a call as shown on line 39 of the example code; const msg = await consumer.receive(); to get the message. It is a blocking call that will deliver one message (if and when it is available).

HTH

@mhaagens
Copy link
Author

Okay, so am I understanding it correctly that there’s no way to set up a listener that processes messages as they come in? Similar to this in kafka-Node;

consumer.on('message', function (message) { console.log(message); });

@david-streamlio
Copy link

@mhaagens Not yet, but there appears to be a PR in process now to add this capability. #56

@mhaagens
Copy link
Author

Awesome, thanks!

@mhaagens
Copy link
Author

Would this be a solution in the meantime?

while (true) {
   const msg = await consumer.receive();
   console.log(msg.getData().toString());
   consumer.acknowledge(msg);
}

@david-streamlio
Copy link

@mhaagens Yes.

@yosiat
Copy link
Contributor

yosiat commented Mar 18, 2020

Hi!

I tried the listener implementation with the following code, that listens forever to messages and once any key received we close the consumer and client (in real-life: instead of listening to any key, we will listen to shutdown signals) -

const Pulsar = require('pulsar-client');

const readline = require('readline');

const rl = new readline.createInterface({
  input: process.stdin,
  output: process.stdout
})

function prompt() {
  return new Promise((resolve) => {
    rl.question(`press any key to stop consuming`, () => resolve());
  })
}

(async () => {
  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650'
    //operationTimeoutSeconds: 30,
  });

  // Create a consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/my-topic',
    subscription: 'sub1',
    subscriptionType: 'Shared',
    ackTimeoutMs: 10000,
    listener: function(msg, consumer) {
      console.log(msg.getData().toString());
      consumer.acknowledge(msg);
    }
  });

  await prompt();
  await consumer.close();
  await client.close();

})();

I have producer which emits at regular intervals (after some time, fails on this - #78 and I restart it).

The consumer after some random time, just closes the connection. I added std::cout to the code and I see Consumer deconstructor is called for every message (which makes sense, since we create for every message a consumer) and the Client deconstructor is called after a while.

I did some changes in order to solve this and it works now, except the listener don't get the consumer instance.

Changes are here - master...yosiat:consumer-listener-bug

If my changes make sense to you, I am happy to open a PR and move the discussion over there.

@sijie
Copy link
Member

sijie commented Mar 18, 2020

@yosiat yes. this approach looks good to me. Can you create a pull request for your proposed change?

@yosiat
Copy link
Contributor

yosiat commented Mar 19, 2020

@sijie created a PR, here: #83

@gperinazzo
Copy link
Contributor

@yosiat I'm seeing a crash if a client with listener is closed twice after your pull request:

const Pulsar = require('./index.js');

const delay = (time) => new Promise(resolve => setTimeout(resolve, time));

(async () => {
  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650'
  });

  let count = 0;

  // Create a consumer
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/my-topic',
    subscription: 'sub1',
    subscriptionType: 'Shared',
    ackTimeoutMs: 10000,
    listener: function(msg, consumerArg) {
      console.log(new Date().toLocaleString(), msg.getData().toString());
      delay(1000).then(() => {
        consumerArg.acknowledge(msg);
        consumerArg.close();
      })
    }
  });
})();

Crashes with the following output:

FATAL ERROR: Error::New napi_get_last_error_info
 1: 0x555d20446711 node::Abort() [node]
2020-04-13 13:04:10.834 INFO  ConsumerImpl:894 | [persistent://public/default/my-topic, sub1, 0] Closed consumer 0
 2: 0x555d20386273 node::FatalError(char const*, char const*) [node]
 3: 0x555d2038627c  [node]
 4: 0x555d20416d11 napi_fatal_error [node]
 5: 0x7f7ab97e6f1a  [/home/guilherme/Projects/pulsar-client-node/build/Release/Pulsar.node]
 6: 0x7f7ab982af2a Consumer::Close(Napi::CallbackInfo const&) [/home/guilherme/Projects/pulsar-client-node/build/Release/Pulsar.node]
 7: 0x7f7ab98313c6 Napi::ObjectWrap<Consumer>::InstanceMethodCallbackWrapper(napi_env__*, napi_callback_info__*) [/home/guilherme/Projects/pulsar-client-node/build/Release/Pulsar.node]
 8: 0x555d203fbf05  [node]
 9: 0x555d2062b657 v8::internal::FunctionCallbackArguments::Call(v8::internal::CallHandlerInfo) [node]
10: 0x555d2062ba21  [node]
11: 0x555d2062c2da  [node]
12: 0x555d2062cbf6 v8::internal::Builtin_HandleApiCall(int, unsigned long*, v8::internal::Isolate*) [node]
13: 0x555d20db6e99  [node]
Aborted (core dumped)

Before the PR, the second call to Close would fail with an Already Closed error, but without crashing Node.

I wrote the original listener implementation, thanks for cleaning it up! Most of the reason it was messy was that I couldn't find a better way to not crash and not leak any memory. It's really annoying due to how the pulsar C interface was made.

The problem is that the C interface creates a pulsar_consumer_t on the stack and passes it to the message listener (here). The consumer it receives is a shared pointer, but the one that is passed to the message listener can't be used after the original message listener function has returned due to this. I think the golang client wrapper workaround for this leaks memory, but I'm not sure. It saves a pointer to the consumer and channel to be used by the listener but I can't find where it deletes the pointer.

Maybe we could add a function to the C interface that clones a pulsar_consumer_t, creating a new shared pointer to the same C++ consumer, and that has to be freed manually? This would allow libraries like this to clone the consumer received by the message listener callback to use it later to ack the message. That would simplify wrapper libraries a lot.

@alphara
Copy link

alphara commented Jul 2, 2020

The listener works for consumer. But it does not work for reader:

const reader = await client.createReader({
    topic: 'persistent://public/default/my-topic',
    startMessageId: Pulsar.MessageId.earliest(),
    listener: function (msg, consumer) {
      console.log(msg.getData().toString());
    }
  });

Do you want to implement it?

@yosiat
Copy link
Contributor

yosiat commented Jul 2, 2020

@alphara looks like the c/c++ which we rely upon don't support listener for reader interface.

@gperinazzo
Copy link
Contributor

I think it is supported on version 2.4.0. Here's the relevant function.

@hrsakai
Copy link
Contributor

hrsakai commented Dec 15, 2021

listener has already been supported, so I closed this issue.

@hrsakai hrsakai closed this as completed Dec 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants