-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Listening for messages #62
Comments
@mhaagens In order to receive messages, you will want to use the |
Thanks for the quick reply! I am using subscribe, but as far I can see from the example you have to manually call receive on the subscriber in a loop to get messages? |
@mhaagens You are correct. The HTH |
Okay, so am I understanding it correctly that there’s no way to set up a listener that processes messages as they come in? Similar to this in kafka-Node;
|
Awesome, thanks! |
Would this be a solution in the meantime?
|
@mhaagens Yes. |
Hi! I tried the listener implementation with the following code, that listens forever to messages and once any key received we close the consumer and client (in real-life: instead of listening to any key, we will listen to shutdown signals) - const Pulsar = require('pulsar-client');
const readline = require('readline');
const rl = new readline.createInterface({
input: process.stdin,
output: process.stdout
})
function prompt() {
return new Promise((resolve) => {
rl.question(`press any key to stop consuming`, () => resolve());
})
}
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650'
//operationTimeoutSeconds: 30,
});
// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
listener: function(msg, consumer) {
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}
});
await prompt();
await consumer.close();
await client.close();
})(); I have producer which emits at regular intervals (after some time, fails on this - #78 and I restart it). The consumer after some random time, just closes the connection. I added I did some changes in order to solve this and it works now, except the Changes are here - master...yosiat:consumer-listener-bug If my changes make sense to you, I am happy to open a PR and move the discussion over there. |
@yosiat yes. this approach looks good to me. Can you create a pull request for your proposed change? |
@yosiat I'm seeing a crash if a client with listener is closed twice after your pull request: const Pulsar = require('./index.js');
const delay = (time) => new Promise(resolve => setTimeout(resolve, time));
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650'
});
let count = 0;
// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
listener: function(msg, consumerArg) {
console.log(new Date().toLocaleString(), msg.getData().toString());
delay(1000).then(() => {
consumerArg.acknowledge(msg);
consumerArg.close();
})
}
});
})(); Crashes with the following output:
Before the PR, the second call to Close would fail with an Already Closed error, but without crashing Node. I wrote the original listener implementation, thanks for cleaning it up! Most of the reason it was messy was that I couldn't find a better way to not crash and not leak any memory. It's really annoying due to how the pulsar C interface was made. The problem is that the C interface creates a Maybe we could add a function to the C interface that clones a |
The listener works for consumer. But it does not work for reader:
Do you want to implement it? |
@alphara looks like the c/c++ which we rely upon don't support listener for reader interface. |
I think it is supported on version 2.4.0. Here's the relevant function. |
listener has already been supported, so I closed this issue. |
How can I listen to incoming messages? The
listener
property on consumer doesn't seem to do anything.The text was updated successfully, but these errors were encountered: