Skip to content

Commit

Permalink
Change consumer listener to use original consumer pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
gperinazzo committed Dec 19, 2019
1 parent 5f3d0a3 commit 5a04aeb
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 118 deletions.
5 changes: 3 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
},
"dependencies": {
"bindings": "^1.3.1",
"node-addon-api": "nodejs/node-addon-api",
"node-addon-api": "^2.0.0",
"node-gyp": "^3.8.0",
"node-pre-gyp": "^0.12.0"
},
Expand Down
43 changes: 24 additions & 19 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,20 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
constructor.SuppressDestruct();
}

void Consumer::SetCConsumer(pulsar_consumer_t *cConsumer) { this->cConsumer = cConsumer; }
void Consumer::SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer) { this->wrapper = cConsumer; }
void Consumer::SetListenerCallback(ListenerCallback *listener) { this->listener = listener; }

Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info) {}
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}

class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
public:
ConsumerNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient,
ConsumerConfig *consumerConfig)
ConsumerConfig *consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper)
: AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})),
deferred(deferred),
cClient(cClient),
consumerConfig(consumerConfig) {}
consumerConfig(consumerConfig),
consumerWrapper(consumerWrapper) {}
~ConsumerNewInstanceWorker() {}
void Execute() {
const std::string &topic = this->consumerConfig->GetTopic();
Expand All @@ -77,9 +78,9 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
return;
}

pulsar_result result =
pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
this->consumerConfig->GetCConsumerConfig(), &this->cConsumer);
pulsar_result result = pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(),
this->consumerConfig->GetCConsumerConfig(),
&this->consumerWrapper->cConsumer);
if (result != pulsar_result_Ok) {
SetError(std::string("Failed to create consumer: ") + pulsar_result_str(result));
} else {
Expand All @@ -91,7 +92,7 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
void OnOK() {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
consumer->SetCConsumer(this->cConsumer);
consumer->SetCConsumer(this->consumerWrapper);
consumer->SetListenerCallback(this->listener);
this->deferred.Resolve(obj);
}
Expand All @@ -103,13 +104,16 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
pulsar_consumer_t *cConsumer;
ConsumerConfig *consumerConfig;
ListenerCallback *listener;
std::shared_ptr<CConsumerWrapper> consumerWrapper;
};

Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();
ConsumerConfig *consumerConfig = new ConsumerConfig(config);
ConsumerNewInstanceWorker *wk = new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig);
std::shared_ptr<CConsumerWrapper> consumerWrapper = std::make_shared<CConsumerWrapper>();
ConsumerConfig *consumerConfig = new ConsumerConfig(config, consumerWrapper);
ConsumerNewInstanceWorker *wk =
new ConsumerNewInstanceWorker(deferred, cClient, consumerConfig, consumerWrapper);
wk->Queue();
return deferred.Promise();
}
Expand Down Expand Up @@ -151,11 +155,12 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
if (info[0].IsUndefined()) {
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer);
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer);
wk->Queue();
} else {
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
ConsumerReceiveWorker *wk =
new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, timeout.Int64Value());
wk->Queue();
}
return deferred.Promise();
Expand All @@ -164,25 +169,26 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
pulsar_consumer_acknowledge_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
}

void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
MessageId *msgId = MessageId::Unwrap(obj);
pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL, NULL);
}

void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL);
pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, msg->GetCMessage(), NULL, NULL);
}

void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
MessageId *msgId = MessageId::Unwrap(obj);
pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL);
pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, msgId->GetCMessageId(), NULL,
NULL);
}

class ConsumerCloseWorker : public Napi::AsyncWorker {
Expand Down Expand Up @@ -210,15 +216,14 @@ class ConsumerCloseWorker : public Napi::AsyncWorker {

Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->cConsumer);
ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, this->wrapper->cConsumer);
wk->Queue();
return deferred.Promise();
}

Consumer::~Consumer() {
pulsar_consumer_pause_message_listener(this->cConsumer);
pulsar_consumer_free(this->cConsumer);
if (this->listener) {
pulsar_consumer_pause_message_listener(this->wrapper->cConsumer);
this->listener->callback.Release();
}
}
5 changes: 2 additions & 3 deletions src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
static Napi::FunctionReference constructor;
Consumer(const Napi::CallbackInfo &info);
~Consumer();
void SetCConsumer(pulsar_consumer_t *cConsumer);
void SetCConsumer(std::shared_ptr<CConsumerWrapper> cConsumer);
void SetListenerCallback(ListenerCallback *listener);
pulsar_consumer_t *GetCConsumer();

private:
pulsar_consumer_t *cConsumer;
std::shared_ptr<CConsumerWrapper> wrapper;
ListenerCallback *listener;

Napi::Value Receive(const Napi::CallbackInfo &info);
Expand Down
122 changes: 32 additions & 90 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
*/

#include "ConsumerConfig.h"
#include "Consumer.h"
#include "Message.h"
#include <pulsar/c/consumer_configuration.h>
#include <pulsar/c/consumer.h>
#include <map>
#include <mutex>
#include <condition_variable>

static const std::string CFG_TOPIC = "topic";
static const std::string CFG_SUBSCRIPTION = "subscription";
Expand All @@ -41,105 +40,37 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Failover", pulsar_ConsumerFailover}};

struct MessageListenerProxyData {
pulsar_consumer_t *cConsumer;
std::shared_ptr<CConsumerWrapper> consumerWrapper;
pulsar_message_t *cMessage;
std::mutex mutex;
std::condition_variable cv;
bool finished;

MessageListenerProxyData(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage)
: cConsumer(cConsumer), cMessage(cMessage), finished(false) {}

void Wait() {
std::unique_lock<std::mutex> lk(this->mutex);
this->cv.wait(lk, [=]() { return this->finished; });
}

void Notify() {
std::lock_guard<std::mutex> lk(this->mutex);
this->finished = true;
this->cv.notify_all();
}

bool Finished() {
std::lock_guard<std::mutex> lk(this->mutex);
return this->finished;
}
MessageListenerProxyData(std::shared_ptr<CConsumerWrapper> consumerWrapper, pulsar_message_t *cMessage)
: consumerWrapper(consumerWrapper), cMessage(cMessage) {}
};

void Acknowledge(const Napi::CallbackInfo &info) {
Napi::Object obj = info[0].As<Napi::Object>();
Message *msg = Message::Unwrap(obj);
std::shared_ptr<MessageListenerProxyData> *listenerData =
(std::shared_ptr<MessageListenerProxyData> *)info.Data();
// We can't use the consumer after it has been notified, the consumer will be destrotyed after the first
// time we call notify
if (!(*listenerData)->Finished()) {
pulsar_consumer_acknowledge_async((*listenerData)->cConsumer, msg->GetCMessage(), NULL, NULL);
}
(*listenerData)->Notify();
}

void NegativeAcknowledge(const Napi::CallbackInfo &info) {
std::shared_ptr<MessageListenerProxyData> *listenerData =
(std::shared_ptr<MessageListenerProxyData> *)info.Data();
// Nack isn't available on the C client version 2.3.0
// This just releases the message listener thread
(*listenerData)->Notify();
}

void FinalizeMessageListenerProxy(napi_env env, void *data, void *) {
std::shared_ptr<MessageListenerProxyData> *listenerData = (std::shared_ptr<MessageListenerProxyData> *)data;
(*listenerData)->Notify();
delete listenerData;
}

void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback,
std::shared_ptr<MessageListenerProxyData> *data) {
Napi::Object obj = Message::NewInstance({}, (*data)->cMessage);
std::shared_ptr<MessageListenerProxyData> *dataCopy = new std::shared_ptr<MessageListenerProxyData>(*data);

Napi::Function acknowledge = Napi::Function::New(env, &Acknowledge, "Acknowledge", data);
Napi::Function negativeAcknowledge =
Napi::Function::New(env, &NegativeAcknowledge, "NegativeAcknowledge", dataCopy);

napi_status ackFinalizerStatus =
napi_wrap(env, acknowledge, data, &FinalizeMessageListenerProxy, NULL, NULL);
if (ackFinalizerStatus != napi_ok) {
(*data)->Notify();
delete dataCopy;
delete data;
return;
}

napi_status nackFinalizerStatus =
napi_wrap(env, negativeAcknowledge, dataCopy, &FinalizeMessageListenerProxy, NULL, NULL);
if (nackFinalizerStatus != napi_ok) {
(*dataCopy)->Notify();
delete dataCopy;
return;
}
jsCallback.Call({obj, acknowledge, negativeAcknowledge});
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Napi::Object consumerObj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(consumerObj);
consumer->SetCConsumer(std::move(data->consumerWrapper));
delete data;
jsCallback.Call({msg, consumerObj});
}

void MessageListener(pulsar_consumer_t *cConsumer, pulsar_message_t *cMessage, void *ctx) {
Napi::ThreadSafeFunction *listenerCallback = (Napi::ThreadSafeFunction *)ctx;
if (listenerCallback->Acquire() != napi_ok) {
ListenerCallback *listenerCallback = (ListenerCallback *)ctx;
if (listenerCallback->callback.Acquire() != napi_ok) {
return;
};
std::shared_ptr<MessageListenerProxyData> data =
std::make_shared<MessageListenerProxyData>(cConsumer, cMessage);
std::shared_ptr<MessageListenerProxyData> *dataPtr = new std::shared_ptr<MessageListenerProxyData>(data);
*dataPtr = data;
listenerCallback->BlockingCall(dataPtr, MessageListenerProxy);
listenerCallback->Release();

data->Wait();
MessageListenerProxyData *dataPtr =
new MessageListenerProxyData(listenerCallback->consumerWrapper, cMessage);
listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
listenerCallback->callback.Release();
}

void FinalizeListenerCallback(Napi::Env env, ListenerCallback *cb, void *) { delete cb; }

ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig,
std::shared_ptr<CConsumerWrapper> consumerWrapper)
: topic(""), subscription(""), ackTimeoutMs(0), listener(nullptr) {
this->cConsumerConfig = pulsar_consumer_configuration_create();

Expand Down Expand Up @@ -201,18 +132,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)

if (consumerConfig.Has(CFG_LISTENER) && consumerConfig.Get(CFG_LISTENER).IsFunction()) {
this->listener = new ListenerCallback();
this->listener->consumerWrapper = consumerWrapper;
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
consumerConfig.Env(), consumerConfig.Get(CFG_LISTENER).As<Napi::Function>(), "Listener Callback", 1,
1, (void *)NULL, FinalizeListenerCallback, listener);
this->listener->callback = std::move(callback);
pulsar_consumer_configuration_set_message_listener(this->cConsumerConfig, &MessageListener,
&this->listener->callback);
this->listener);
}
}

ConsumerConfig::~ConsumerConfig() {
pulsar_consumer_configuration_free(this->cConsumerConfig);
if (this->listener) this->listener->callback.Release();
if (this->listener) {
this->listener->callback.Release();
}
}

pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return this->cConsumerConfig; }
Expand All @@ -226,3 +160,11 @@ ListenerCallback *ConsumerConfig::GetListenerCallback() {
}

int64_t ConsumerConfig::GetAckTimeoutMs() { return this->ackTimeoutMs; }

CConsumerWrapper::CConsumerWrapper() : cConsumer(nullptr) {}

CConsumerWrapper::~CConsumerWrapper() {
if (this->cConsumer) {
pulsar_consumer_free(this->cConsumer);
}
}
9 changes: 8 additions & 1 deletion src/ConsumerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@

#define MIN_ACK_TIMEOUT_MILLIS 10000

struct CConsumerWrapper {
pulsar_consumer_t *cConsumer;
CConsumerWrapper();
~CConsumerWrapper();
};

struct ListenerCallback {
Napi::ThreadSafeFunction callback;
std::shared_ptr<CConsumerWrapper> consumerWrapper;
};

class ConsumerConfig {
public:
ConsumerConfig(const Napi::Object &consumerConfig);
ConsumerConfig(const Napi::Object &consumerConfig, std::shared_ptr<CConsumerWrapper> consumerWrapper);
~ConsumerConfig();
pulsar_consumer_configuration_t *GetCConsumerConfig();
std::string GetTopic();
Expand Down
4 changes: 2 additions & 2 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ const Pulsar = require('../index.js');
topic,
subscription: 'sub1',
ackTimeoutMs: 10000,
listener: (message, ack) => {
listener: (message, messageConsumer) => {
const data = message.getData().toString();
results.push(data);
ack(message);
messageConsumer.acknowledge(message);
if (results.length === 10) finish();
},
});
Expand Down

0 comments on commit 5a04aeb

Please sign in to comment.