Skip to content

Commit

Permalink
refactor(rebalance): use smart_ptr to manage pull request (#206)
Browse files Browse the repository at this point in the history
refactor(rebalance): use smart_ptr to manage pull request
  • Loading branch information
ShannonDing authored Dec 11, 2019
1 parent 4e90be9 commit bc5adc5
Show file tree
Hide file tree
Showing 23 changed files with 588 additions and 423 deletions.
15 changes: 4 additions & 11 deletions include/DefaultMQPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
virtual void getSubscriptions(std::vector<SubscriptionData>&);
virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
virtual void removeConsumeOffset(const MQMessageQueue& mq);
virtual void producePullMsgTask(PullRequest*);
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest);
virtual Rebalance* getRebalance() const;
//<!end MQConsumer;

Expand All @@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
* if subExpression is setted to "null" or "*"all msg will be subscribed
* if subExpression is setted to "null" or "*", all msg will be subscribed
* @param offset
* specify the started pull offset
* @param maxNums
Expand All @@ -90,7 +90,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
* if subExpression is setted to "null" or "*"all msg will be subscribed
* if subExpression is setted to "null" or "*", all msg will be subscribed
* @param offset
* specify the started pull offset
* @param maxNums
Expand All @@ -107,20 +107,13 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {

virtual ConsumerRunningInfo* getConsumerRunningInfo() { return NULL; }
/**
* 获取消费进度,返回-1表示出错
*
* @param mq
* @param fromStore
* @return
*/
int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
/**
* 根据topic获取MessageQueue,以均衡方式在组内多个成员之间分配
*
* @param topic
* 消息Topic
* @return 返回队列集合
*/

void fetchMessageQueuesInBalance(const std::string& topic, std::vector<MQMessageQueue> mqs);

// temp persist consumer offset interface, only valid with
Expand Down
14 changes: 9 additions & 5 deletions include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,17 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
virtual Rebalance* getRebalance() const;
ConsumeMsgService* getConsumerMsgService() const;

virtual void producePullMsgTask(PullRequest*);
void triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* request);
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>);
virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int millis);
static void static_triggerNextPullRequest(void* context,
boost::asio::deadline_timer* t,
boost::weak_ptr<PullRequest>);
void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr<PullRequest>);
void runPullMsgQueue(TaskQueue* pTaskQueue);
void pullMessage(PullRequest* pullrequest); // sync pullMsg
void pullMessageAsync(PullRequest* pullrequest); // async pullMsg
void pullMessage(boost::weak_ptr<PullRequest> pullrequest); // sync pullMsg
void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest); // async pullMsg
void setAsyncPull(bool asyncFlag);
AsyncPullCallback* getAsyncPullCallBack(PullRequest* request, MQMessageQueue msgQueue);
AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>, MQMessageQueue msgQueue);
void shutdownAsyncPullCallBack();

/*
Expand Down
2 changes: 1 addition & 1 deletion include/MQConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
virtual ConsumeType getConsumeType() = 0;
virtual ConsumeFromWhere getConsumeFromWhere() = 0;
virtual void getSubscriptions(std::vector<SubscriptionData>&) = 0;
virtual void producePullMsgTask(PullRequest*) = 0;
virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>) = 0;
virtual Rebalance* getRebalance() const = 0;
virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums) = 0;
virtual void pull(const MQMessageQueue& mq,
Expand Down
8 changes: 5 additions & 3 deletions src/MQClientFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1040,10 +1040,12 @@ void MQClientFactory::resetOffset(const string& group,

for (; it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
boost::weak_ptr<PullRequest> pullRequest = pConsumer->getRebalance()->getPullRequest(mq);
boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
// PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
pullreq->setDropped(true);
LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/AllocateMQStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,5 @@ class AllocateMQAveragely : public AllocateMQStrategy {
};

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq
#endif
33 changes: 24 additions & 9 deletions src/consumer/ConsumeMessageConcurrentlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,35 @@ MessageListenerType ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListe
return m_pMessageListener->getMessageListenerType();
}

void ConsumeMessageConcurrentlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
if (request->isDropped()) {
LOG_INFO("Pull request for %s is dropped, which will be released in next re-balance.",
request->m_messageQueue.toString().c_str());
return;
}
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs));
}

void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
if (!request || request->isDroped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
if (!request || request->isDropped()) {
LOG_WARN("the pull request had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}

//<!¶ÁÈ¡Êý¾Ý;
if (msgs.empty()) {
LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
return;
Expand Down Expand Up @@ -123,12 +139,11 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vec

// update offset
int64 offset = request->removeMessage(msgs);
// LOG_DEBUG("update offset:%lld of mq: %s",
// offset,(request->m_messageQueue).toString().c_str());
if (offset >= 0) {
m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
} else {
LOG_WARN("Note: accumulation consume occurs on mq:%s", (request->m_messageQueue).toString().c_str());
LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
(request->m_messageQueue).toString().c_str());
}
}

Expand All @@ -144,4 +159,4 @@ void ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& ms
}

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq
33 changes: 27 additions & 6 deletions src/consumer/ConsumeMessageOrderlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,25 @@ MessageListenerType ConsumeMessageOrderlyService::getConsumeMsgSerivceListenerTy
return m_pMessageListener->getMessageListenerType();
}

void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {
void ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
vector<MQMessageExt>& msgs) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, this, request));
}

void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* context,
PullRequest* request,
boost::weak_ptr<PullRequest> pullRequest,
bool tryLockMQ,
boost::asio::deadline_timer* t) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
LOG_INFO("submit consumeRequest later for mq:%s", request->m_messageQueue.toString().c_str());
vector<MQMessageExt> msgs;
ConsumeMessageOrderlyService* orderlyService = (ConsumeMessageOrderlyService*)context;
Expand All @@ -122,7 +133,12 @@ void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex
deleteAndZero(t);
}

void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
bool bGetMutex = false;
boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
if (!lock.owns_lock()) {
Expand All @@ -140,7 +156,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
// request->m_messageQueue.toString().c_str());
return;
}
if (!request || request->isDroped()) {
if (!request || request->isDropped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
Expand Down Expand Up @@ -189,11 +205,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
}
}
}
void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ) {
void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ) {
boost::shared_ptr<PullRequest> request = pullRequest.lock();
if (!request) {
LOG_WARN("Pull request has been released");
return;
}
int retryTimer = tryLockMQ ? 500 : 100;
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer));
t->async_wait(
boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), this, request, tryLockMQ, t));
}
}
} // namespace rocketmq
16 changes: 8 additions & 8 deletions src/consumer/ConsumeMsgService.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConsumeMsgService {
virtual void start() {}
virtual void shutdown() {}
virtual void stopThreadPool() {}
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) {}
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs) {}
virtual MessageListenerType getConsumeMsgSerivceListenerType() { return messageListenerDefaultly; }
};

Expand All @@ -48,11 +48,11 @@ class ConsumeMessageConcurrentlyService : public ConsumeMsgService {
virtual ~ConsumeMessageConcurrentlyService();
virtual void start();
virtual void shutdown();
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);
virtual MessageListenerType getConsumeMsgSerivceListenerType();
virtual void stopThreadPool();

void ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
void ConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);

private:
void resetRetryTopic(vector<MQMessageExt>& msgs);
Expand All @@ -71,17 +71,17 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
virtual ~ConsumeMessageOrderlyService();
virtual void start();
virtual void shutdown();
virtual void submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, vector<MQMessageExt>& msgs);
virtual void stopThreadPool();
virtual MessageListenerType getConsumeMsgSerivceListenerType();

void boost_asio_work();
void tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ);
void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool tryLockMQ);
static void static_submitConsumeRequestLater(void* context,
PullRequest* request,
boost::weak_ptr<PullRequest> request,
bool tryLockMQ,
boost::asio::deadline_timer* t);
void ConsumeRequest(PullRequest* request);
void ConsumeRequest(boost::weak_ptr<PullRequest> request);
void lockMQPeriodically(boost::system::error_code& ec, boost::asio::deadline_timer* t);
void unlockAllMQ();
bool lockOneMQ(const MQMessageQueue& mq);
Expand All @@ -99,6 +99,6 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService {
};

//<!***************************************************************************
} //<!end namespace;
} // namespace rocketmq

#endif //<! _CONSUMEMESSAGESERVICE_H_
6 changes: 4 additions & 2 deletions src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,13 @@ void DefaultMQPullConsumer::getSubscriptions(vector<SubscriptionData>& result) {
}
}

void DefaultMQPullConsumer::producePullMsgTask(PullRequest*) {}
bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest) {
return true;
}

Rebalance* DefaultMQPullConsumer::getRebalance() const {
return NULL;
}

//<!************************************************************************
} //<!end namespace;
} // namespace rocketmq
Loading

0 comments on commit bc5adc5

Please sign in to comment.