Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail on send message and receive message #95

Closed
dantepsychedelico opened this issue Jun 19, 2020 · 1 comment
Closed

Fail on send message and receive message #95

dantepsychedelico opened this issue Jun 19, 2020 · 1 comment

Comments

@dantepsychedelico
Copy link

Hi,
I have a question, when I use more than three consumers and producers together, I can't send any message and got message. I have no idea to debug it.

'use strict';

let Pulsar = require('pulsar-client');

(async function main() {
  let client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  let topics = [
    'non-persistent://public/default/foo1',
    'non-persistent://public/default/foo2',
    'non-persistent://public/default/foo3',
    'non-persistent://public/default/foo4',
  ];
  let producers = {};
  for (let topic of topics) {
    let producer = await client.createProducer({
      topic: topic,
    });
    producers[topic] = producer;
  }

  console.log('---producer-done');
  let consumers = {};
  for (let topic of topics) {
    let consumer = await client.subscribe({
      topic: topic,
      subscription: 'sub1',
      subscriptionType: 'Failover',
      receiverQueueSize: 100000,
      receiverQueueSizeAcrossPartitions: 100000,
    });
    consumers[topic] = consumer;
  }

  for (let topic in consumers) {
    (async function(topic) {
      while (true) {
        let now = Date.now();
        try {
          console.log('wait', topic);
          const consumer = consumers[topic];
          const msg = await consumer.receive(1000*60*30);
          let new_now = Date.now();
          console.log('get', topic, JSON.parse(msg.getData().toString()), new_now-now);
          now = new_now;
          consumer.acknowledge(msg);
        } catch(err) {
          console.log('timeout', topic);
        }
      }
    })(topic)
  }

  console.log('---completed');

  for (let topic in producers) {
    setInterval(async() => {
      const producer = producers[topic];
      console.log('send', topic);
      let now = Date.now();
      await producer.send({
        data: Buffer.from(JSON.stringify({now})),
      });
      console.log('send done', topic);
    }, 500);
  }
})();
@dantepsychedelico
Copy link
Author

I change receive message from promise to listener Ref, the problem is sloved now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant